From f320333f5f58ecc2eabdfd3236e0ba28d946753a Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Sun, 7 Jun 2026 16:03:51 +0000 Subject: [PATCH] refactor(fire_digest): replace LLM renderer with deterministic output Remove _build_prompt, _llm_render, _terse_fallback and all LLM backend references. render_digest() now queries the fires table directly and builds a structured multi-line wire: header with count, up to 5 fires with acres/containment/anchor, and a +N more overflow line. FireDigestScheduler no longer accepts or uses llm_backend. Updated the pipeline __init__.py call site accordingly. Co-Authored-By: Claude Opus 4.6 --- meshai/notifications/pipeline/__init__.py | 2 +- meshai/notifications/scheduled/fire_digest.py | 193 ++++++------------ 2 files changed, 64 insertions(+), 131 deletions(-) diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 499a846..5b2a244 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -239,7 +239,7 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: disp = comps.get("dispatcher") llm_backend = comps.get("llm_backend") if disp is not None: - fd_sched = FireDigestScheduler(disp, llm_backend) + fd_sched = FireDigestScheduler(disp) await fd_sched.start() comps["fire_digest_scheduler"] = fd_sched bus._pipeline_components = comps diff --git a/meshai/notifications/scheduled/fire_digest.py b/meshai/notifications/scheduled/fire_digest.py index 5496803..db69c83 100644 --- a/meshai/notifications/scheduled/fire_digest.py +++ b/meshai/notifications/scheduled/fire_digest.py @@ -1,15 +1,13 @@ """v0.7-fire-tracker-4 fire-digest scheduled broadcaster. -Twice-daily (default 06:00 + 18:00 Mountain) summary of active fires + -the last 24 h of growth / spotting events, rendered by the LLM into a -terse mesh wire and broadcast via dispatcher.dispatch_scheduled_broadcast. +Twice-daily (default 06:00 + 18:00 Mountain) deterministic summary of +active fires, broadcast via dispatcher.dispatch_scheduled_broadcast. Modeled after band_conditions.py (cf. v0.5.11 scheduled broadcaster). """ from __future__ import annotations import asyncio -import json import logging import time from datetime import datetime, timedelta, timezone @@ -47,102 +45,83 @@ def _slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str) -> int: # =========================================================================== -# Data + prompt +# Deterministic renderer # =========================================================================== -def _gather_fire_context(now: int, *, window_h: int = 24): - """Build the LLM-facing data block for the digest prompt.""" +def _get_anchor(lat, lon) -> str: + """Get location anchor for a fire using nearest_town from central_normalizer.""" + if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)): + return "" + try: + from meshai.central_normalizer import nearest_town + max_mi = float(adapter_config.wfigs.anchor_max_mi) + nt = nearest_town(lat, lon, max_distance_mi=max_mi) + except Exception: + return "" + if nt and nt.get("name"): + town = nt["name"].title() + d = nt.get("distance_mi") + bearing = nt.get("bearing") + if isinstance(d, (int, float)): + if d < 1: + return f"near {town}" + return f"{int(round(d))} mi {bearing or ''} of {town}".strip() + return f"near {town}" + return "" + + +async def render_digest(*, now: Optional[int] = None) -> tuple[str, str]: + """Build the digest wire string deterministically. + + Returns (wire, source). source is 'deterministic' on success, + 'no_fires' if there are no active fires (wire is empty). + """ from meshai.persistence import get_db + + now = now if now is not None else int(time.time()) conn = get_db() - cutoff = now - window_h * 3600 + fires = conn.execute( - "SELECT irwin_id, incident_name, current_acres, " - "current_contained_pct, lat, lon, state, county, last_pass_at " + "SELECT incident_name, current_acres, current_contained_pct, " + "lat, lon, county, state " "FROM fires " "WHERE tombstoned_at IS NULL " "ORDER BY COALESCE(current_acres, 0) DESC LIMIT 20", ).fetchall() + if not fires: - return None + return "", "no_fires" - fire_blocks: list[str] = [] - for f in fires: - passes = conn.execute( - "SELECT pass_id, drift_mi_from_prev, drift_direction, " - "drift_mi_per_hour FROM fire_passes " - "WHERE irwin_id=? AND pass_ended_at >= ? " - "ORDER BY pass_ended_at DESC LIMIT 4", - (f["irwin_id"], cutoff), - ).fetchall() - growth_summary = "no recent passes" - if passes: - drifts = [ - f"{p['drift_mi_from_prev']:.1f}mi {p['drift_direction']}" - for p in passes - if p["drift_mi_from_prev"] is not None - and p["drift_direction"] is not None - ] - if drifts: - growth_summary = "drift " + ", ".join(drifts) - else: - growth_summary = f"{len(passes)} pass(es), no drift recorded" + n = len(fires) + lines: list[str] = [] + lines.append(f"\U0001f525 Fire Digest \u2014 {n} active wildfire(s) in Idaho") - spot_count = conn.execute( - "SELECT COUNT(*) FROM event_log " - "WHERE source='firms' AND category LIKE 'wildfire_spotting%'", - ).fetchone()[0] + for f in fires[:5]: + name = f["incident_name"] or "(unnamed)" + acres = f["current_acres"] + contained = f["current_contained_pct"] - anchor = (f"{f['county']}/{f['state']}" - if f["county"] and f["state"] else "ID") - fire_blocks.append( - f"- {f['incident_name'] or '(unnamed)'} " - f"({f['current_acres'] or 0:.0f} ac, " - f"{f['current_contained_pct'] or 0}% contained, {anchor}); " - f"{growth_summary}" - ) - if not fire_blocks: - return None - return "\n".join(fire_blocks) + acres_str = f"{int(acres):,} ac" if acres and acres > 0 else "size unknown" + contained_str = (f"{int(contained)}% contained" + if contained is not None else "containment unknown") + anchor = _get_anchor(f["lat"], f["lon"]) + if not anchor: + county = f["county"] + state = f["state"] + if county and state: + anchor = f"{county} Co {state}" -def _build_prompt(context_block: str, *, max_chars: int) -> str: - return ( - f"You are a wildfire radio dispatcher writing a single-message " - f"summary for mesh-radio operators in Idaho. You have data on " - f"{context_block.count(chr(10)) + 1} active fires. " - f"Write ONE message of <= {max_chars} characters that names the " - f"top fires, includes any movement direction/speed, and notes " - f"any spotting or possible new fires. Be terse: this is " - f"bandwidth-constrained mesh radio. No markdown, no bullet " - f"points, no greeting, no sign-off. Plain text only.\n\n" - f"DATA:\n{context_block}" - ) + parts = [f"{name}: {acres_str}, {contained_str}"] + if anchor: + parts[0] += f", {anchor}" + lines.append(parts[0]) + if n > 5: + lines.append(f"+ {n - 5} more") -def _terse_fallback(context_block: str, *, max_chars: int) -> str: - """Used when the LLM call fails or no LLM is configured.""" - lines = context_block.splitlines() - fires_n = len(lines) - head = f"Fires today ({fires_n}): " - body_parts: list[str] = [] - for line in lines[:3]: - # line shape: "- ( ac, % contained, anchor); ..." - if line.startswith("- "): - line = line[2:] - # Trim to " ac" - head_part = line.split("(", 1) - if len(head_part) == 2: - name = head_part[0].strip() - rest = head_part[1].split(",", 1)[0] - body_parts.append(f"{name} {rest}") - else: - body_parts.append(line[:40]) - body = "; ".join(body_parts) - if fires_n > 3: - body += f"; +{fires_n - 3} more" - out = head + body - return out[:max_chars] + return "\n".join(lines), "deterministic" # =========================================================================== @@ -169,50 +148,6 @@ def _record_slot_attempt(slot_epoch_s: int, *, return int(cur.lastrowid) if cur.rowcount > 0 else None -async def _llm_render(prompt: str, llm_backend, *, max_chars: int) -> Optional[str]: - """Call the LLM backend and return the trimmed/cleaned wire string.""" - if llm_backend is None: - return None - try: - text = await llm_backend.generate( - messages=[{"role": "user", "content": prompt}], - system_prompt="", - max_tokens=512, - ) - except Exception: - logger.exception("fire_digest: LLM call failed") - return None - if not text: - return None - # Strip markdown the LLM may have added even though prompted not to. - try: - from meshai.chunker import strip_markdown - text = strip_markdown(text) - except Exception: - pass - # Replace newlines with spaces (a digest is a single-line wire). - text = " ".join(text.split()) - return text[:max_chars] - - -async def render_digest(*, llm_backend, now: Optional[int] = None, - max_chars: Optional[int] = None) -> tuple[str, str]: - """Build the digest wire string. Returns (wire, source). source is - 'llm' on success, 'fallback_terse' on LLM failure, 'no_fires' if - there are no active fires (wire is empty in that case).""" - now = now if now is not None else int(time.time()) - cap = (max_chars if max_chars is not None - else int(adapter_config.fires.digest_max_chars)) - ctx = _gather_fire_context(now) - if ctx is None: - return "", "no_fires" - prompt = _build_prompt(ctx, max_chars=cap) - wire = await _llm_render(prompt, llm_backend, max_chars=cap) - if wire: - return wire, "llm" - return _terse_fallback(ctx, max_chars=cap), "fallback_terse" - - # =========================================================================== # Scheduler # =========================================================================== @@ -221,11 +156,10 @@ async def render_digest(*, llm_backend, now: Optional[int] = None, class FireDigestScheduler: """Fires fire-digest broadcasts at configured local times.""" - def __init__(self, dispatcher, llm_backend, *, + def __init__(self, dispatcher, *, clock: Optional[Callable[[], float]] = None, sleep: Optional[Callable[[float], Any]] = None): self._dispatcher = dispatcher - self._llm = llm_backend self._clock = clock or time.time self._sleep = sleep or asyncio.sleep self._task: Optional[asyncio.Task] = None @@ -300,8 +234,7 @@ class FireDigestScheduler: async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool: """Build + broadcast for the given slot. Returns True on broadcast.""" - wire, source = await render_digest(llm_backend=self._llm, - now=int(self._clock())) + wire, source = await render_digest(now=int(self._clock())) if source == "no_fires": self._logger.info( "fire-digest: silent skip for %s (no active fires)", hh_mm)