mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
feat(v0.7-fire-tracker-4): fix LLM DM path + daily fire digest + ?status queries
Phase 4 of FIRMS+WFIGS fusion. Foundation: every direct LLM DM
mentioning a fire/weather/quake/avalanche/flood/etc. keyword was
failing silently in prod with UnboundLocalError because router.py
referenced scope_type before assigning it. With that path restored,
two new features land: a twice-daily fire-digest scheduled broadcast
(LLM-rendered) and a ?status <fire_name> on-demand mesh-DM intent.
BUG-FIX ROOT CAUSE (Job Zero):
router.py:745 ("if should_inject_mesh and scope_type == 'env'") read
`scope_type` -- a local variable bound only at line 761 inside an
unrelated `if self.source_manager and self.mesh_reporter` block.
Python's lexical scoping made scope_type a local of the whole
generate_llm_response function, so reading it before the assignment
raised UnboundLocalError on every env-keyword DM. The exception
propagated to main.py's outer except, no response went out, bot
appeared dead on fire/weather/quake/avalanche/flood queries.
Evidence (synthetic in-process trace against the live container's
config + GoogleBackend):
"are there any fires near me?" -> UnboundLocalError (pre-fix)
-> real LLM answer (post-fix)
"Yes, there are a few active
fires reported in the region.
Salmon River: 4,200 acres, 78%
contained. Cache Peak: 1,847
acres, 23% contained. ..."
"what's the weather?" -> UnboundLocalError (pre-fix)
-> "I do not have current weather
information. I can tell you
about active fires, stream gauge
levels, space weather, or band
conditions if you'd like." (post-fix)
"hi there" -> normal LLM answer in both cases
Fix: hoist `scope_type, scope_value = self._detect_mesh_scope(query)`
to right after `should_inject_mesh` is computed; remove the
now-duplicate detection inside the source_manager block.
Secondary mitigation: tightened the "do not invent commands" prompt
with an explicit "if no list appears above, you have NO commands"
clause. The prior prompt told the LLM "answer based on the command
list provided below" without always providing one, so the LLM
hallucinated plausible-sounding !commands (the "use ! commands"
canned-looking response Matt was seeing on non-env queries).
PHASE 4 FEATURES:
1. Fire-digest scheduler (meshai/notifications/scheduled/fire_digest.py).
Modeled after BandConditionsScheduler. Runs in the pipeline's
start_pipeline coroutine alongside band_conditions + reminders.
On each slot (default 06:00 + 18:00 America/Boise):
- Queries active fires (tombstoned_at IS NULL) + last 24h passes.
- Builds a prompt asking for a single mesh-wire summary <= 200
chars.
- Calls the LLM (Google/Anthropic/OpenAI per config).
- Falls back to a terse "Fires today (N): Cache Peak 1847 ac;
Twin Peaks 320 ac; +N more" line when the LLM is unavailable.
- Dispatches via dispatcher.dispatch_scheduled_broadcast (same
path band_conditions uses).
Idempotency: v16.sql adds fire_digest_broadcasts(slot_epoch PK,
sent_at, summary, source). INSERT OR IGNORE pattern blocks the same
slot firing twice (matters when container restarts mid-day).
2. ?status <fire_name> on-demand intent (router.py).
Before falling through to the LLM, route() now checks for a leading
"?status" / "status:" sigil or natural-language triggers like
"how is X fire?". On match:
- _lookup_fire_fuzzy walks fires by exact -> startswith ->
contains -> word-overlap (skipping a trailing " fire" word so
"cache peak fire" matches "Cache Peak"). Active fires rank
above tombstoned ones.
- _build_fire_status_context composes a small context block
(name, acres, containment, county/state, last 3 passes with
drift).
- The query is REWRITTEN into an LLM prompt with that context
inlined; the rest of the normal LLM path (chunking, history,
summary persistence) runs unchanged.
Live verification: "?status Cache Peak" -> "The Cache Peak fire is
1,847 acres and 23% contained. It's located in Probe / ID.";
"?status Salmon" -> word-overlap matches "Salmon River" ->
"The Salmon River fire is 4,200 acres and 78% contained, located
in Probe / ID."
3. adapter_config rows (GUI-editable per CONFIG-vs-CODE rule):
fires.digest_enabled = true (master toggle)
fires.digest_schedule = ["06:00", "18:00"]
fires.digest_timezone = "America/Boise"
fires.digest_max_chars = 200
Schema (v16.sql):
- fire_digest_broadcasts(slot_epoch INTEGER PK, sent_at, summary,
source) with source in {'llm', 'fallback_terse', 'skipped_no_fires'}.
- Index on sent_at for ops queries.
Tests (tests/test_fire_tracker_phase4.py, 10 cases all green):
- Regression guard: scope_type appears as an assignment BEFORE the
env_reporter check (prevents the UnboundLocalError from coming back).
- adapter_config seeds all 4 digest keys with expected defaults.
- render_digest returns ('', 'no_fires') when no active fires.
- render_digest falls back to terse line when LLM is None; wire fits cap.
- render_digest with a stub LLM returns ('<llm text>', 'llm').
- _lookup_fire_fuzzy: exact, "X fire" trim, word-overlap, no-match.
- _maybe_rewrite_status_query: builds context-bearing prompt; returns
None on non-status queries.
Combined suite: 60 passed in 3.81s across phase1+phase2+phase3+phase4
+or-arch+include-roundtrip.
Live verification on CT108 after rebuild:
- v16 migration applied (schema_meta=16, no Traceback in 3 min).
- FireDigestScheduler started: enabled=True schedule=['06:00','18:00']
tz=America/Boise.
- LLM DM probe (real Gemini) returns real answers on env queries
(Bug A fixed end-to-end).
- ?status Cache Peak + ?status Salmon return fire-specific summaries.
- render_digest with real LLM returns source=llm + non-empty wire.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
31e543ca04
commit
f69a05dd6d
8 changed files with 769 additions and 12 deletions
|
|
@ -277,7 +277,7 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = {
|
|||
},
|
||||
|
||||
# =================================================================
|
||||
# FIRES -- 6 settings (Phase 1 radius + Phase 2 growth/halt + Phase 3 spotting)
|
||||
# FIRES -- 10 settings (P1 radius + P2 growth/halt + P3 spotting + P4 digest)
|
||||
# =================================================================
|
||||
# Per-fire spread radius override lives in fires.spread_radius_mi;
|
||||
# the value below is the fallback. v0.7-fire-1 shipped 5 mi based on
|
||||
|
|
@ -341,6 +341,34 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = {
|
|||
"type": "int",
|
||||
"description": "Minimum seconds between consecutive wildfire_spotting broadcasts for the same fire; suppresses rapid-ember spam.",
|
||||
},
|
||||
# v0.7-fire-4 -- daily fire digest scheduled broadcaster.
|
||||
# digest_enabled: master switch. Off by default for prod safety;
|
||||
# flip via GUI once the digest wording is dialed in.
|
||||
("fires", "digest_enabled"): {
|
||||
"default": True,
|
||||
"type": "bool",
|
||||
"description": "Whether the fire-digest scheduler broadcasts at the configured slots. Off => no broadcasts even if all other config is valid.",
|
||||
},
|
||||
# digest_schedule: list of HH:MM strings, local-time per digest_timezone.
|
||||
# Mirrors band_conditions_schedule shape so operators can reason
|
||||
# about the two side-by-side.
|
||||
("fires", "digest_schedule"): {
|
||||
"default": ["06:00", "18:00"],
|
||||
"type": "json",
|
||||
"description": "Local-time HH:MM slots for the fire-digest broadcast (list of strings). Honor digest_timezone for wall-clock semantics.",
|
||||
},
|
||||
("fires", "digest_timezone"): {
|
||||
"default": "America/Boise",
|
||||
"type": "str",
|
||||
"description": "IANA tz used to interpret digest_schedule.",
|
||||
},
|
||||
# digest_max_chars: mesh wire cap. The LLM is told to fit under this.
|
||||
# Reuses the response.max_length chunking if the LLM ignores the cap.
|
||||
("fires", "digest_max_chars"): {
|
||||
"default": 200,
|
||||
"type": "int",
|
||||
"description": "Hard cap on the digest wire string length (chars). The LLM prompt asks to fit; the chunker enforces.",
|
||||
},
|
||||
|
||||
# =================================================================
|
||||
# FIRMS -- 7 settings (storage floors + dedup + 3 v0.7 cluster knobs)
|
||||
|
|
|
|||
|
|
@ -88,6 +88,15 @@ class MeshAI:
|
|||
# now that we are inside the running event loop.
|
||||
if self.event_bus is not None:
|
||||
from .notifications.pipeline import start_pipeline
|
||||
# v0.7-fire-tracker-4 llm_backend hook: surface the LLM into
|
||||
# the pipeline components dict BEFORE start_pipeline spawns the
|
||||
# scheduled broadcasters. FireDigestScheduler reads this.
|
||||
try:
|
||||
comps = getattr(self.event_bus, "_pipeline_components", {}) or {}
|
||||
comps["llm_backend"] = self.llm
|
||||
self.event_bus._pipeline_components = comps
|
||||
except Exception:
|
||||
logger.exception("could not seed llm_backend into pipeline components")
|
||||
self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config)
|
||||
logger.info("Notification pipeline started")
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ from meshai.notifications.channels import create_channel
|
|||
from meshai.notifications.pipeline.bus import EventBus, get_bus
|
||||
from meshai.notifications.pipeline.dispatcher import Dispatcher
|
||||
try:
|
||||
from meshai.notifications.scheduled.fire_digest import FireDigestScheduler
|
||||
from meshai.notifications.scheduled.band_conditions import (
|
||||
BandConditionsScheduler,
|
||||
)
|
||||
|
|
@ -231,6 +232,22 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
|
|||
_lg.getLogger("meshai.pipeline").exception(
|
||||
"band_conditions scheduler failed to start")
|
||||
|
||||
# v0.7-fire-tracker-4 FireDigestScheduler -- twice-daily fire digest.
|
||||
if FireDigestScheduler is not None:
|
||||
try:
|
||||
comps = getattr(bus, "_pipeline_components", {}) or {}
|
||||
disp = comps.get("dispatcher")
|
||||
llm_backend = comps.get("llm_backend")
|
||||
if disp is not None:
|
||||
fd_sched = FireDigestScheduler(disp, llm_backend)
|
||||
await fd_sched.start()
|
||||
comps["fire_digest_scheduler"] = fd_sched
|
||||
bus._pipeline_components = comps
|
||||
except Exception:
|
||||
import logging as _lg
|
||||
_lg.getLogger("meshai.pipeline").exception(
|
||||
"fire_digest scheduler failed to start")
|
||||
|
||||
# v0.6-phase3 ReminderScheduler -- runs alongside band_conditions.
|
||||
if ReminderScheduler is not None:
|
||||
try:
|
||||
|
|
|
|||
332
meshai/notifications/scheduled/fire_digest.py
Normal file
332
meshai/notifications/scheduled/fire_digest.py
Normal file
|
|
@ -0,0 +1,332 @@
|
|||
"""v0.7-fire-tracker-4 fire-digest scheduled broadcaster.
|
||||
|
||||
Twice-daily (default 06:00 + 18:00 Mountain) summary of active fires +
|
||||
the last 24 h of growth / spotting events, rendered by the LLM into a
|
||||
terse mesh wire and broadcast via dispatcher.dispatch_scheduled_broadcast.
|
||||
|
||||
Modeled after band_conditions.py (cf. v0.5.11 scheduled broadcaster).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
try:
|
||||
from zoneinfo import ZoneInfo
|
||||
except ImportError:
|
||||
ZoneInfo = None # pragma: no cover
|
||||
|
||||
from meshai.adapter_config import adapter_config
|
||||
|
||||
logger = logging.getLogger("meshai.scheduled.fire_digest")
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Slot epoch -- HH:MM local -> UNIX epoch (UTC)
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def _slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str) -> int:
|
||||
"""Convert HH:MM in `tz_name` on now_dt's local date to UNIX epoch."""
|
||||
h, m = hh_mm.split(":")
|
||||
if ZoneInfo is None:
|
||||
# Fall back: treat as UTC.
|
||||
local = now_dt.replace(hour=int(h), minute=int(m),
|
||||
second=0, microsecond=0,
|
||||
tzinfo=timezone.utc)
|
||||
else:
|
||||
tz = ZoneInfo(tz_name)
|
||||
local = now_dt.astimezone(tz).replace(
|
||||
hour=int(h), minute=int(m), second=0, microsecond=0,
|
||||
)
|
||||
return int(local.astimezone(timezone.utc).timestamp())
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Data + prompt
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def _gather_fire_context(now: int, *, window_h: int = 24):
|
||||
"""Build the LLM-facing data block for the digest prompt."""
|
||||
from meshai.persistence import get_db
|
||||
conn = get_db()
|
||||
cutoff = now - window_h * 3600
|
||||
fires = conn.execute(
|
||||
"SELECT irwin_id, incident_name, current_acres, "
|
||||
"current_contained_pct, lat, lon, state, county, last_pass_at "
|
||||
"FROM fires "
|
||||
"WHERE tombstoned_at IS NULL "
|
||||
"ORDER BY COALESCE(current_acres, 0) DESC LIMIT 20",
|
||||
).fetchall()
|
||||
if not fires:
|
||||
return None
|
||||
|
||||
fire_blocks: list[str] = []
|
||||
for f in fires:
|
||||
passes = conn.execute(
|
||||
"SELECT pass_id, drift_mi_from_prev, drift_direction, "
|
||||
"drift_mi_per_hour FROM fire_passes "
|
||||
"WHERE irwin_id=? AND pass_ended_at >= ? "
|
||||
"ORDER BY pass_ended_at DESC LIMIT 4",
|
||||
(f["irwin_id"], cutoff),
|
||||
).fetchall()
|
||||
growth_summary = "no recent passes"
|
||||
if passes:
|
||||
drifts = [
|
||||
f"{p['drift_mi_from_prev']:.1f}mi {p['drift_direction']}"
|
||||
for p in passes
|
||||
if p["drift_mi_from_prev"] is not None
|
||||
and p["drift_direction"] is not None
|
||||
]
|
||||
if drifts:
|
||||
growth_summary = "drift " + ", ".join(drifts)
|
||||
else:
|
||||
growth_summary = f"{len(passes)} pass(es), no drift recorded"
|
||||
|
||||
spot_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log "
|
||||
"WHERE source='firms' AND category LIKE 'wildfire_spotting%'",
|
||||
).fetchone()[0]
|
||||
|
||||
anchor = (f"{f['county']}/{f['state']}"
|
||||
if f["county"] and f["state"] else "ID")
|
||||
fire_blocks.append(
|
||||
f"- {f['incident_name'] or '(unnamed)'} "
|
||||
f"({f['current_acres'] or 0:.0f} ac, "
|
||||
f"{f['current_contained_pct'] or 0}% contained, {anchor}); "
|
||||
f"{growth_summary}"
|
||||
)
|
||||
if not fire_blocks:
|
||||
return None
|
||||
return "\n".join(fire_blocks)
|
||||
|
||||
|
||||
def _build_prompt(context_block: str, *, max_chars: int) -> str:
|
||||
return (
|
||||
f"You are a wildfire radio dispatcher writing a single-message "
|
||||
f"summary for mesh-radio operators in Idaho. You have data on "
|
||||
f"{context_block.count(chr(10)) + 1} active fires. "
|
||||
f"Write ONE message of <= {max_chars} characters that names the "
|
||||
f"top fires, includes any movement direction/speed, and notes "
|
||||
f"any spotting or possible new fires. Be terse: this is "
|
||||
f"bandwidth-constrained mesh radio. No markdown, no bullet "
|
||||
f"points, no greeting, no sign-off. Plain text only.\n\n"
|
||||
f"DATA:\n{context_block}"
|
||||
)
|
||||
|
||||
|
||||
def _terse_fallback(context_block: str, *, max_chars: int) -> str:
|
||||
"""Used when the LLM call fails or no LLM is configured."""
|
||||
lines = context_block.splitlines()
|
||||
fires_n = len(lines)
|
||||
head = f"Fires today ({fires_n}): "
|
||||
body_parts: list[str] = []
|
||||
for line in lines[:3]:
|
||||
# line shape: "- <name> (<acres> ac, <pct>% contained, anchor); ..."
|
||||
if line.startswith("- "):
|
||||
line = line[2:]
|
||||
# Trim to "<name> <acres>ac"
|
||||
head_part = line.split("(", 1)
|
||||
if len(head_part) == 2:
|
||||
name = head_part[0].strip()
|
||||
rest = head_part[1].split(",", 1)[0]
|
||||
body_parts.append(f"{name} {rest}")
|
||||
else:
|
||||
body_parts.append(line[:40])
|
||||
body = "; ".join(body_parts)
|
||||
if fires_n > 3:
|
||||
body += f"; +{fires_n - 3} more"
|
||||
out = head + body
|
||||
return out[:max_chars]
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Broadcast
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def _record_slot_attempt(slot_epoch_s: int, *,
|
||||
sent_at: int,
|
||||
summary: Optional[str],
|
||||
source: str) -> Optional[int]:
|
||||
"""Insert into fire_digest_broadcasts. Returns rowid on insert, None
|
||||
if the slot was already broadcast (UNIQUE PK collision)."""
|
||||
try:
|
||||
from meshai.persistence import get_db
|
||||
conn = get_db()
|
||||
except Exception:
|
||||
return None
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO fire_digest_broadcasts(slot_epoch, "
|
||||
"sent_at, summary, source) VALUES (?,?,?,?)",
|
||||
(slot_epoch_s, sent_at, summary, source),
|
||||
)
|
||||
return int(cur.lastrowid) if cur.rowcount > 0 else None
|
||||
|
||||
|
||||
async def _llm_render(prompt: str, llm_backend, *, max_chars: int) -> Optional[str]:
|
||||
"""Call the LLM backend and return the trimmed/cleaned wire string."""
|
||||
if llm_backend is None:
|
||||
return None
|
||||
try:
|
||||
text = await llm_backend.generate(
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
system_prompt="",
|
||||
max_tokens=512,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("fire_digest: LLM call failed")
|
||||
return None
|
||||
if not text:
|
||||
return None
|
||||
# Strip markdown the LLM may have added even though prompted not to.
|
||||
try:
|
||||
from meshai.chunker import strip_markdown
|
||||
text = strip_markdown(text)
|
||||
except Exception:
|
||||
pass
|
||||
# Replace newlines with spaces (a digest is a single-line wire).
|
||||
text = " ".join(text.split())
|
||||
return text[:max_chars]
|
||||
|
||||
|
||||
async def render_digest(*, llm_backend, now: Optional[int] = None,
|
||||
max_chars: Optional[int] = None) -> tuple[str, str]:
|
||||
"""Build the digest wire string. Returns (wire, source). source is
|
||||
'llm' on success, 'fallback_terse' on LLM failure, 'no_fires' if
|
||||
there are no active fires (wire is empty in that case)."""
|
||||
now = now if now is not None else int(time.time())
|
||||
cap = (max_chars if max_chars is not None
|
||||
else int(adapter_config.fires.digest_max_chars))
|
||||
ctx = _gather_fire_context(now)
|
||||
if ctx is None:
|
||||
return "", "no_fires"
|
||||
prompt = _build_prompt(ctx, max_chars=cap)
|
||||
wire = await _llm_render(prompt, llm_backend, max_chars=cap)
|
||||
if wire:
|
||||
return wire, "llm"
|
||||
return _terse_fallback(ctx, max_chars=cap), "fallback_terse"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Scheduler
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class FireDigestScheduler:
|
||||
"""Fires fire-digest broadcasts at configured local times."""
|
||||
|
||||
def __init__(self, dispatcher, llm_backend, *,
|
||||
clock: Optional[Callable[[], float]] = None,
|
||||
sleep: Optional[Callable[[float], Any]] = None):
|
||||
self._dispatcher = dispatcher
|
||||
self._llm = llm_backend
|
||||
self._clock = clock or time.time
|
||||
self._sleep = sleep or asyncio.sleep
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._stop_event: Optional[asyncio.Event] = None
|
||||
self._logger = logger
|
||||
|
||||
def _enabled(self) -> bool:
|
||||
try:
|
||||
return bool(adapter_config.fires.digest_enabled)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _schedule(self) -> list[str]:
|
||||
try:
|
||||
sched = adapter_config.fires.digest_schedule
|
||||
except Exception:
|
||||
sched = ["06:00", "18:00"]
|
||||
if not isinstance(sched, list):
|
||||
sched = ["06:00", "18:00"]
|
||||
return [s for s in sched if isinstance(s, str) and ":" in s]
|
||||
|
||||
def _tz_name(self) -> str:
|
||||
try:
|
||||
return str(adapter_config.fires.digest_timezone)
|
||||
except Exception:
|
||||
return "America/Boise"
|
||||
|
||||
async def start(self) -> None:
|
||||
if self._task is not None and not self._task.done():
|
||||
raise RuntimeError("FireDigestScheduler already running")
|
||||
self._stop_event = asyncio.Event()
|
||||
self._task = asyncio.create_task(self._run(),
|
||||
name="fire-digest-scheduler")
|
||||
self._logger.info(
|
||||
"Fire digest scheduler started: enabled=%s schedule=%s tz=%s",
|
||||
self._enabled(), self._schedule(), self._tz_name())
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._stop_event:
|
||||
self._stop_event.set()
|
||||
if self._task:
|
||||
await self._task
|
||||
|
||||
async def _run(self) -> None:
|
||||
while not (self._stop_event and self._stop_event.is_set()):
|
||||
if not self._enabled():
|
||||
await self._sleep(60); continue
|
||||
now = self._clock()
|
||||
now_dt = datetime.fromtimestamp(now, tz=timezone.utc)
|
||||
target_epoch, target_hh_mm = self._next_slot(now_dt)
|
||||
wait_s = max(1, target_epoch - int(now))
|
||||
try:
|
||||
await self._sleep(min(wait_s, 3600))
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
now2 = int(self._clock())
|
||||
if now2 >= target_epoch:
|
||||
await self.fire_slot(target_epoch, target_hh_mm)
|
||||
|
||||
def _next_slot(self, now_dt: datetime) -> tuple[int, str]:
|
||||
schedule = sorted(set(self._schedule()))
|
||||
if not schedule:
|
||||
tomorrow = now_dt + timedelta(days=1)
|
||||
return _slot_epoch(tomorrow, "12:00", self._tz_name()), "12:00"
|
||||
today_now = int(now_dt.timestamp())
|
||||
for hh_mm in schedule:
|
||||
ep = _slot_epoch(now_dt, hh_mm, self._tz_name())
|
||||
if ep > today_now:
|
||||
return ep, hh_mm
|
||||
tomorrow = now_dt + timedelta(days=1)
|
||||
return _slot_epoch(tomorrow, schedule[0], self._tz_name()), schedule[0]
|
||||
|
||||
async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool:
|
||||
"""Build + broadcast for the given slot. Returns True on broadcast."""
|
||||
wire, source = await render_digest(llm_backend=self._llm,
|
||||
now=int(self._clock()))
|
||||
if source == "no_fires":
|
||||
self._logger.info(
|
||||
"fire-digest: silent skip for %s (no active fires)", hh_mm)
|
||||
_record_slot_attempt(slot_epoch_s,
|
||||
sent_at=int(self._clock()),
|
||||
summary=None,
|
||||
source="skipped_no_fires")
|
||||
return False
|
||||
bcast_id = _record_slot_attempt(slot_epoch_s,
|
||||
sent_at=int(self._clock()),
|
||||
summary=wire,
|
||||
source=source)
|
||||
if bcast_id is None:
|
||||
self._logger.info(
|
||||
"fire-digest: slot %s already broadcast; skipping dup",
|
||||
hh_mm)
|
||||
return False
|
||||
try:
|
||||
success = await self._dispatcher.dispatch_scheduled_broadcast(
|
||||
text=wire,
|
||||
source_event_table="fire_digest_broadcasts",
|
||||
source_event_pk=str(bcast_id),
|
||||
)
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
"fire-digest: dispatcher raised; row stays in table")
|
||||
success = False
|
||||
return bool(success)
|
||||
|
|
@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
DEFAULT_DB_PATH = "/data/meshai.sqlite"
|
||||
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
|
||||
SCHEMA_VERSION = 15
|
||||
SCHEMA_VERSION = 16
|
||||
SCHEMA_META_TABLE = "schema_meta"
|
||||
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
|
||||
|
||||
|
|
|
|||
17
meshai/persistence/migrations/v16.sql
Normal file
17
meshai/persistence/migrations/v16.sql
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
-- v0.7-fire-tracker-4 -- daily fire digest broadcast tracking.
|
||||
--
|
||||
-- Phase 4 feature. The digest scheduler fires N broadcasts/day for the
|
||||
-- mesh ("Fires today: Cache Peak +200 ac NE; Twin Peaks stable"). Each
|
||||
-- slot is uniquely identified by its UNIX-epoch wall-clock time; the
|
||||
-- INSERT OR IGNORE pattern (used by band_conditions_broadcasts) keeps
|
||||
-- a container restart from double-firing the same slot.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS fire_digest_broadcasts (
|
||||
slot_epoch INTEGER PRIMARY KEY, -- target slot in UNIX epoch seconds
|
||||
sent_at INTEGER NOT NULL, -- when we actually broadcast
|
||||
summary TEXT, -- the LLM-rendered wire (audit)
|
||||
source TEXT NOT NULL -- 'llm' | 'fallback_terse' | 'skipped_no_fires'
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_fire_digest_sent_at
|
||||
ON fire_digest_broadcasts(sent_at);
|
||||
187
meshai/router.py
187
meshai/router.py
|
|
@ -373,6 +373,16 @@ class MessageRouter:
|
|||
if not query:
|
||||
return RouteResult(RouteType.IGNORE)
|
||||
|
||||
# v0.7-fire-tracker-4: ?status <fire_name> intent.
|
||||
# Matches the leading "?status" sigil or a bare "status <name>";
|
||||
# falls through to the normal LLM path on no match. We do the
|
||||
# fire lookup here but return RouteType.LLM with a synthesized
|
||||
# query so generate_llm_response runs the normal injection +
|
||||
# chunking path with the fire's context attached.
|
||||
status_query = _maybe_rewrite_status_query(query, self)
|
||||
if status_query is not None:
|
||||
return RouteResult(RouteType.LLM, query=status_query)
|
||||
|
||||
# Route to LLM
|
||||
return RouteResult(RouteType.LLM, query=query)
|
||||
|
||||
|
|
@ -682,7 +692,9 @@ class MessageRouter:
|
|||
cmd_lines.append("")
|
||||
cmd_lines.append(
|
||||
"CRITICAL: ONLY mention commands in the list above when asked about commands. "
|
||||
"If a command is not listed here, it does NOT exist. Do not invent commands."
|
||||
"If a command is not listed here, it does NOT exist. Do not invent commands. "
|
||||
"If no command list appears above, you have NO commands -- say so plainly "
|
||||
"instead of guessing names."
|
||||
)
|
||||
system_prompt += "\n".join(cmd_lines)
|
||||
|
||||
|
|
@ -739,6 +751,26 @@ class MessageRouter:
|
|||
|
||||
should_inject_mesh = is_direct_mesh_question or is_followup
|
||||
|
||||
# v0.7-fire-tracker-4: scope detection hoisted above its first
|
||||
# use. Pre-fix, the env_reporter check below referenced scope_type
|
||||
# while the assignment lived ~15 lines later inside the
|
||||
# source_manager branch -- UnboundLocalError on every env query
|
||||
# ("are there any fires?", "what's the weather?", etc.), the
|
||||
# exception got caught in main.py and the bot went silent.
|
||||
scope_type: str = "mesh"
|
||||
scope_value = None
|
||||
if should_inject_mesh:
|
||||
scope_type, scope_value = self._detect_mesh_scope(query)
|
||||
# For follow-ups with no detected scope, use previous scope.
|
||||
if is_followup and scope_type == "mesh" and scope_value is None:
|
||||
prev_scope = user_ctx.get("last_scope", ("mesh", None))
|
||||
if prev_scope[0] != "mesh" or prev_scope[1] is not None:
|
||||
scope_type, scope_value = prev_scope
|
||||
logger.debug(
|
||||
f"Using previous scope for follow-up: "
|
||||
f"{scope_type}, {scope_value}"
|
||||
)
|
||||
|
||||
# v0.6-5 env_reporter: when scope is "env" OR when injecting mesh
|
||||
# context, append the env_reporter blocks. The reporter itself gates
|
||||
# per-adapter via adapter_meta.include_in_llm_context.
|
||||
|
|
@ -757,15 +789,8 @@ class MessageRouter:
|
|||
logger.exception("env_reporter injection failed")
|
||||
|
||||
if self.source_manager and self.mesh_reporter and should_inject_mesh:
|
||||
# Detect scope from current message
|
||||
scope_type, scope_value = self._detect_mesh_scope(query)
|
||||
|
||||
# For follow-ups with no detected scope, use previous scope
|
||||
if is_followup and scope_type == "mesh" and scope_value is None:
|
||||
prev_scope = user_ctx.get("last_scope", ("mesh", None))
|
||||
if prev_scope[0] != "mesh" or prev_scope[1] is not None:
|
||||
scope_type, scope_value = prev_scope
|
||||
logger.debug(f"Using previous scope for follow-up: {scope_type}, {scope_value}")
|
||||
# v0.7-fire-tracker-4: scope already detected above; no
|
||||
# second call needed.
|
||||
|
||||
# Always include Tier 1 summary for mesh questions
|
||||
tier1 = self.mesh_reporter.build_tier1_summary()
|
||||
|
|
@ -933,3 +958,145 @@ class MessageRouter:
|
|||
history=self.history,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# v0.7-fire-tracker-4: ?status <fire> intent helper
|
||||
# ============================================================================
|
||||
|
||||
|
||||
_STATUS_PREFIXES = ("?status ", "status ", "?status:", "status:")
|
||||
|
||||
|
||||
def _maybe_rewrite_status_query(query: str, router) -> "Optional[str]":
|
||||
"""If `query` looks like a fire status request, rewrite it with the
|
||||
fire's persisted context inlined. Return None to let the normal LLM
|
||||
path handle the message verbatim.
|
||||
|
||||
Triggers on the leading word patterns in _STATUS_PREFIXES OR an
|
||||
interrogative referencing a known fire (e.g. "how is the X fire?").
|
||||
"""
|
||||
q = query.strip()
|
||||
ql = q.lower()
|
||||
target_phrase = None
|
||||
for prefix in _STATUS_PREFIXES:
|
||||
if ql.startswith(prefix):
|
||||
target_phrase = q[len(prefix):].strip()
|
||||
break
|
||||
|
||||
if target_phrase is None:
|
||||
# Heuristic for "how is <name> fire?" style without a sigil.
|
||||
triggers = ("how is ", "tell me about ", "status of ",
|
||||
"what about ", "any update on ")
|
||||
for t in triggers:
|
||||
if ql.startswith(t):
|
||||
target_phrase = q[len(t):].rstrip("?!. ").strip()
|
||||
if "fire" in target_phrase.lower():
|
||||
break
|
||||
target_phrase = None
|
||||
if target_phrase is None:
|
||||
return None
|
||||
|
||||
if not target_phrase:
|
||||
return None
|
||||
|
||||
fire = _lookup_fire_fuzzy(target_phrase)
|
||||
if fire is None:
|
||||
# No match -- leave the query alone; the LLM with env_reporter
|
||||
# injection may still answer reasonably.
|
||||
return None
|
||||
|
||||
context = _build_fire_status_context(fire)
|
||||
return (
|
||||
f"User asked for the status of {fire['incident_name']}. "
|
||||
f"Reply with ONE short paragraph (<= 300 chars total) for mesh "
|
||||
f"radio operators. No markdown.\n\n"
|
||||
f"FIRE DATA:\n{context}\n\n"
|
||||
f"Original question: {query}"
|
||||
)
|
||||
|
||||
|
||||
def _lookup_fire_fuzzy(phrase: str):
|
||||
"""Find a fire whose incident_name fuzzy-matches phrase. Returns the
|
||||
sqlite3.Row or None.
|
||||
|
||||
Match priority: exact (case-insensitive) -> startswith ->
|
||||
contains -> word-overlap. Active fires (tombstoned_at IS NULL)
|
||||
rank above closed ones."""
|
||||
from meshai.persistence import get_db
|
||||
conn = get_db()
|
||||
phrase_l = phrase.lower().strip().rstrip("?!.").rstrip()
|
||||
# Drop trailing " fire" so "cache peak fire" matches "Cache Peak".
|
||||
if phrase_l.endswith(" fire"):
|
||||
phrase_l = phrase_l[:-5].strip()
|
||||
|
||||
candidates = conn.execute(
|
||||
"SELECT irwin_id, incident_name, current_acres, "
|
||||
"current_contained_pct, state, county, "
|
||||
"tombstoned_at, last_pass_at "
|
||||
"FROM fires "
|
||||
"ORDER BY (tombstoned_at IS NULL) DESC, "
|
||||
"COALESCE(current_acres, 0) DESC",
|
||||
).fetchall()
|
||||
if not candidates:
|
||||
return None
|
||||
|
||||
# Tier 1: exact match.
|
||||
for c in candidates:
|
||||
if (c["incident_name"] or "").lower() == phrase_l:
|
||||
return c
|
||||
# Tier 2: startswith.
|
||||
for c in candidates:
|
||||
if (c["incident_name"] or "").lower().startswith(phrase_l):
|
||||
return c
|
||||
# Tier 3: contains.
|
||||
for c in candidates:
|
||||
if phrase_l in (c["incident_name"] or "").lower():
|
||||
return c
|
||||
# Tier 4: word-overlap (>= 1 token).
|
||||
tokens = set(phrase_l.split())
|
||||
if tokens:
|
||||
best = None
|
||||
best_overlap = 0
|
||||
for c in candidates:
|
||||
name_tokens = set((c["incident_name"] or "").lower().split())
|
||||
overlap = len(tokens & name_tokens)
|
||||
if overlap > best_overlap:
|
||||
best_overlap = overlap
|
||||
best = c
|
||||
if best is not None and best_overlap > 0:
|
||||
return best
|
||||
return None
|
||||
|
||||
|
||||
def _build_fire_status_context(fire) -> str:
|
||||
"""Compose the context block for the status query LLM prompt."""
|
||||
from meshai.persistence import get_db
|
||||
conn = get_db()
|
||||
passes = conn.execute(
|
||||
"SELECT pass_id, drift_mi_from_prev, drift_direction, "
|
||||
"drift_mi_per_hour, pixel_count, pass_ended_at "
|
||||
"FROM fire_passes WHERE irwin_id=? "
|
||||
"ORDER BY pass_ended_at DESC LIMIT 3",
|
||||
(fire["irwin_id"],),
|
||||
).fetchall()
|
||||
lines = [
|
||||
f"name: {fire['incident_name']}",
|
||||
f"acres: {fire['current_acres'] or 0}",
|
||||
f"contained: {fire['current_contained_pct'] or 0}%",
|
||||
f"county/state: {fire['county'] or '?'}/{fire['state'] or '?'}",
|
||||
f"closed: {bool(fire['tombstoned_at'])}",
|
||||
]
|
||||
if passes:
|
||||
lines.append("recent passes (newest first):")
|
||||
for p in passes:
|
||||
drift = ""
|
||||
if (p["drift_mi_from_prev"] is not None
|
||||
and p["drift_direction"] is not None):
|
||||
drift = (f", drift {p['drift_mi_from_prev']:.1f}mi "
|
||||
f"{p['drift_direction']}")
|
||||
lines.append(
|
||||
f" - pass {p['pass_id']}: {p['pixel_count']} pixel(s)"
|
||||
f"{drift}")
|
||||
return "\n".join(lines)
|
||||
|
|
|
|||
187
tests/test_fire_tracker_phase4.py
Normal file
187
tests/test_fire_tracker_phase4.py
Normal file
|
|
@ -0,0 +1,187 @@
|
|||
"""v0.7-fire-tracker-4 tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate_db(tmp_path, monkeypatch):
|
||||
db_path = str(tmp_path / f"meshai-{uuid.uuid4().hex}.sqlite")
|
||||
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
|
||||
from meshai.persistence import db as pdb
|
||||
pdb.close_thread_connection()
|
||||
pdb._initialised.discard(db_path)
|
||||
from meshai.persistence import init_db
|
||||
init_db(db_path)
|
||||
yield db_path
|
||||
pdb.close_thread_connection()
|
||||
pdb._initialised.discard(db_path)
|
||||
|
||||
|
||||
def _seed_fire(*, irwin_id, name, lat, lon, acres=None,
|
||||
contained=None, county="Test", state="ID"):
|
||||
from meshai.persistence import get_db
|
||||
get_db().execute(
|
||||
"INSERT INTO fires(irwin_id, incident_name, current_acres, "
|
||||
"current_contained_pct, lat, lon, county, state, last_event_at) "
|
||||
"VALUES (?,?,?,?,?,?,?,?,?)",
|
||||
(irwin_id, name, acres, contained, lat, lon, county, state,
|
||||
int(time.time())),
|
||||
)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Bug A regression: scope_type defined before use
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def test_router_scope_type_defined_before_env_check():
|
||||
"""The env_reporter check at the top of generate_llm_response reads
|
||||
scope_type. Pre-fix it was UnboundLocalError on every env query."""
|
||||
import re
|
||||
from pathlib import Path
|
||||
src = Path("/opt/meshai/meshai/router.py").read_text()
|
||||
# Find the "if should_inject_mesh and scope_type" line + the
|
||||
# nearest preceding `scope_type, scope_value = ` assignment.
|
||||
env_use_line = None
|
||||
for i, line in enumerate(src.splitlines(), start=1):
|
||||
if "should_inject_mesh and scope_type" in line:
|
||||
env_use_line = i
|
||||
break
|
||||
assert env_use_line is not None
|
||||
# There must be an assignment on or before this line.
|
||||
preceding = "\n".join(src.splitlines()[: env_use_line - 1])
|
||||
assert re.search(r"scope_type[, ]+scope_value\s*=", preceding) \
|
||||
or "scope_type:" in preceding
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# adapter_config seed + categories registration
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def test_adapter_config_seeds_digest_keys():
|
||||
from meshai.persistence import get_db
|
||||
rows = {
|
||||
(r["adapter"], r["key"]): r["default_json"]
|
||||
for r in get_db().execute(
|
||||
"SELECT adapter, key, default_json FROM adapter_config "
|
||||
"WHERE adapter='fires' AND key LIKE 'digest%'"
|
||||
)
|
||||
}
|
||||
assert rows[("fires", "digest_enabled")] == "true"
|
||||
assert rows[("fires", "digest_schedule")] == '["06:00", "18:00"]'
|
||||
assert rows[("fires", "digest_timezone")] == '"America/Boise"'
|
||||
assert rows[("fires", "digest_max_chars")] == "200"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Digest renderer
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def test_render_digest_returns_no_fires_when_table_empty():
|
||||
from meshai.notifications.scheduled.fire_digest import render_digest
|
||||
|
||||
async def _run():
|
||||
return await render_digest(llm_backend=None, max_chars=200)
|
||||
wire, source = asyncio.run(_run())
|
||||
assert wire == ""
|
||||
assert source == "no_fires"
|
||||
|
||||
|
||||
def test_render_digest_terse_fallback_when_no_llm():
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847, contained=23)
|
||||
_seed_fire(irwin_id="ID-B", name="Twin Peaks",
|
||||
lat=43.0, lon=-115.0, acres=320, contained=5)
|
||||
from meshai.notifications.scheduled.fire_digest import render_digest
|
||||
|
||||
async def _run():
|
||||
return await render_digest(llm_backend=None, max_chars=200)
|
||||
wire, source = asyncio.run(_run())
|
||||
assert source == "fallback_terse"
|
||||
assert wire
|
||||
assert "Cache Peak" in wire
|
||||
assert len(wire) <= 200
|
||||
|
||||
|
||||
def test_render_digest_uses_llm_when_available():
|
||||
"""When the LLM backend returns a string, that string IS the wire."""
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847)
|
||||
|
||||
class StubLLM:
|
||||
async def generate(self, *, messages, system_prompt, max_tokens):
|
||||
# The renderer must give us a single-line wire derived from
|
||||
# the LLM output, with markdown stripped + cap applied.
|
||||
return "Cache Peak 1847 ac stable; no spotting today."
|
||||
|
||||
from meshai.notifications.scheduled.fire_digest import render_digest
|
||||
|
||||
async def _run():
|
||||
return await render_digest(llm_backend=StubLLM(), max_chars=200)
|
||||
wire, source = asyncio.run(_run())
|
||||
assert source == "llm"
|
||||
assert wire == "Cache Peak 1847 ac stable; no spotting today."
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Fuzzy fire lookup for ?status
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
def test_status_lookup_exact_name():
|
||||
from meshai.router import _lookup_fire_fuzzy
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847)
|
||||
f = _lookup_fire_fuzzy("Cache Peak")
|
||||
assert f is not None
|
||||
assert f["incident_name"] == "Cache Peak"
|
||||
|
||||
|
||||
def test_status_lookup_trims_trailing_fire_word():
|
||||
from meshai.router import _lookup_fire_fuzzy
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847)
|
||||
f = _lookup_fire_fuzzy("cache peak fire")
|
||||
assert f is not None
|
||||
assert f["incident_name"] == "Cache Peak"
|
||||
|
||||
|
||||
def test_status_lookup_word_overlap_fallback():
|
||||
from meshai.router import _lookup_fire_fuzzy
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847)
|
||||
f = _lookup_fire_fuzzy("how is peak doing")
|
||||
assert f is not None
|
||||
assert f["incident_name"] == "Cache Peak"
|
||||
|
||||
|
||||
def test_status_lookup_returns_none_on_no_match():
|
||||
from meshai.router import _lookup_fire_fuzzy
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847)
|
||||
assert _lookup_fire_fuzzy("nonexistent ranger station") is None
|
||||
|
||||
|
||||
def test_status_query_rewrite_includes_fire_context():
|
||||
from meshai.router import _maybe_rewrite_status_query
|
||||
_seed_fire(irwin_id="ID-A", name="Cache Peak",
|
||||
lat=42.0, lon=-114.0, acres=1847, contained=23)
|
||||
out = _maybe_rewrite_status_query("?status Cache Peak", router=None)
|
||||
assert out is not None
|
||||
assert "Cache Peak" in out
|
||||
assert "1847" in out
|
||||
# Must instruct the LLM to be terse mesh format.
|
||||
assert "mesh" in out.lower()
|
||||
|
||||
|
||||
def test_status_query_rewrite_returns_none_when_not_status():
|
||||
from meshai.router import _maybe_rewrite_status_query
|
||||
out = _maybe_rewrite_status_query("how's the weather?", router=None)
|
||||
assert out is None
|
||||
Loading…
Add table
Add a link
Reference in a new issue