diff --git a/meshai/adapter_config/defaults.py b/meshai/adapter_config/defaults.py index c05455d..b5d783a 100644 --- a/meshai/adapter_config/defaults.py +++ b/meshai/adapter_config/defaults.py @@ -277,7 +277,7 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { }, # ================================================================= - # FIRES -- 6 settings (Phase 1 radius + Phase 2 growth/halt + Phase 3 spotting) + # FIRES -- 10 settings (P1 radius + P2 growth/halt + P3 spotting + P4 digest) # ================================================================= # Per-fire spread radius override lives in fires.spread_radius_mi; # the value below is the fallback. v0.7-fire-1 shipped 5 mi based on @@ -341,6 +341,34 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { "type": "int", "description": "Minimum seconds between consecutive wildfire_spotting broadcasts for the same fire; suppresses rapid-ember spam.", }, + # v0.7-fire-4 -- daily fire digest scheduled broadcaster. + # digest_enabled: master switch. Off by default for prod safety; + # flip via GUI once the digest wording is dialed in. + ("fires", "digest_enabled"): { + "default": True, + "type": "bool", + "description": "Whether the fire-digest scheduler broadcasts at the configured slots. Off => no broadcasts even if all other config is valid.", + }, + # digest_schedule: list of HH:MM strings, local-time per digest_timezone. + # Mirrors band_conditions_schedule shape so operators can reason + # about the two side-by-side. + ("fires", "digest_schedule"): { + "default": ["06:00", "18:00"], + "type": "json", + "description": "Local-time HH:MM slots for the fire-digest broadcast (list of strings). Honor digest_timezone for wall-clock semantics.", + }, + ("fires", "digest_timezone"): { + "default": "America/Boise", + "type": "str", + "description": "IANA tz used to interpret digest_schedule.", + }, + # digest_max_chars: mesh wire cap. The LLM is told to fit under this. + # Reuses the response.max_length chunking if the LLM ignores the cap. + ("fires", "digest_max_chars"): { + "default": 200, + "type": "int", + "description": "Hard cap on the digest wire string length (chars). The LLM prompt asks to fit; the chunker enforces.", + }, # ================================================================= # FIRMS -- 7 settings (storage floors + dedup + 3 v0.7 cluster knobs) diff --git a/meshai/main.py b/meshai/main.py index c476ead..4dd6e5d 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -88,6 +88,15 @@ class MeshAI: # now that we are inside the running event loop. if self.event_bus is not None: from .notifications.pipeline import start_pipeline + # v0.7-fire-tracker-4 llm_backend hook: surface the LLM into + # the pipeline components dict BEFORE start_pipeline spawns the + # scheduled broadcasters. FireDigestScheduler reads this. + try: + comps = getattr(self.event_bus, "_pipeline_components", {}) or {} + comps["llm_backend"] = self.llm + self.event_bus._pipeline_components = comps + except Exception: + logger.exception("could not seed llm_backend into pipeline components") self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config) logger.info("Notification pipeline started") diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 0a64f99..499a846 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -28,6 +28,7 @@ from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus from meshai.notifications.pipeline.dispatcher import Dispatcher try: + from meshai.notifications.scheduled.fire_digest import FireDigestScheduler from meshai.notifications.scheduled.band_conditions import ( BandConditionsScheduler, ) @@ -231,6 +232,22 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: _lg.getLogger("meshai.pipeline").exception( "band_conditions scheduler failed to start") + # v0.7-fire-tracker-4 FireDigestScheduler -- twice-daily fire digest. + if FireDigestScheduler is not None: + try: + comps = getattr(bus, "_pipeline_components", {}) or {} + disp = comps.get("dispatcher") + llm_backend = comps.get("llm_backend") + if disp is not None: + fd_sched = FireDigestScheduler(disp, llm_backend) + await fd_sched.start() + comps["fire_digest_scheduler"] = fd_sched + bus._pipeline_components = comps + except Exception: + import logging as _lg + _lg.getLogger("meshai.pipeline").exception( + "fire_digest scheduler failed to start") + # v0.6-phase3 ReminderScheduler -- runs alongside band_conditions. if ReminderScheduler is not None: try: diff --git a/meshai/notifications/scheduled/fire_digest.py b/meshai/notifications/scheduled/fire_digest.py new file mode 100644 index 0000000..5496803 --- /dev/null +++ b/meshai/notifications/scheduled/fire_digest.py @@ -0,0 +1,332 @@ +"""v0.7-fire-tracker-4 fire-digest scheduled broadcaster. + +Twice-daily (default 06:00 + 18:00 Mountain) summary of active fires + +the last 24 h of growth / spotting events, rendered by the LLM into a +terse mesh wire and broadcast via dispatcher.dispatch_scheduled_broadcast. + +Modeled after band_conditions.py (cf. v0.5.11 scheduled broadcaster). +""" +from __future__ import annotations + +import asyncio +import json +import logging +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Optional + +try: + from zoneinfo import ZoneInfo +except ImportError: + ZoneInfo = None # pragma: no cover + +from meshai.adapter_config import adapter_config + +logger = logging.getLogger("meshai.scheduled.fire_digest") + + +# =========================================================================== +# Slot epoch -- HH:MM local -> UNIX epoch (UTC) +# =========================================================================== + + +def _slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str) -> int: + """Convert HH:MM in `tz_name` on now_dt's local date to UNIX epoch.""" + h, m = hh_mm.split(":") + if ZoneInfo is None: + # Fall back: treat as UTC. + local = now_dt.replace(hour=int(h), minute=int(m), + second=0, microsecond=0, + tzinfo=timezone.utc) + else: + tz = ZoneInfo(tz_name) + local = now_dt.astimezone(tz).replace( + hour=int(h), minute=int(m), second=0, microsecond=0, + ) + return int(local.astimezone(timezone.utc).timestamp()) + + +# =========================================================================== +# Data + prompt +# =========================================================================== + + +def _gather_fire_context(now: int, *, window_h: int = 24): + """Build the LLM-facing data block for the digest prompt.""" + from meshai.persistence import get_db + conn = get_db() + cutoff = now - window_h * 3600 + fires = conn.execute( + "SELECT irwin_id, incident_name, current_acres, " + "current_contained_pct, lat, lon, state, county, last_pass_at " + "FROM fires " + "WHERE tombstoned_at IS NULL " + "ORDER BY COALESCE(current_acres, 0) DESC LIMIT 20", + ).fetchall() + if not fires: + return None + + fire_blocks: list[str] = [] + for f in fires: + passes = conn.execute( + "SELECT pass_id, drift_mi_from_prev, drift_direction, " + "drift_mi_per_hour FROM fire_passes " + "WHERE irwin_id=? AND pass_ended_at >= ? " + "ORDER BY pass_ended_at DESC LIMIT 4", + (f["irwin_id"], cutoff), + ).fetchall() + growth_summary = "no recent passes" + if passes: + drifts = [ + f"{p['drift_mi_from_prev']:.1f}mi {p['drift_direction']}" + for p in passes + if p["drift_mi_from_prev"] is not None + and p["drift_direction"] is not None + ] + if drifts: + growth_summary = "drift " + ", ".join(drifts) + else: + growth_summary = f"{len(passes)} pass(es), no drift recorded" + + spot_count = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE source='firms' AND category LIKE 'wildfire_spotting%'", + ).fetchone()[0] + + anchor = (f"{f['county']}/{f['state']}" + if f["county"] and f["state"] else "ID") + fire_blocks.append( + f"- {f['incident_name'] or '(unnamed)'} " + f"({f['current_acres'] or 0:.0f} ac, " + f"{f['current_contained_pct'] or 0}% contained, {anchor}); " + f"{growth_summary}" + ) + if not fire_blocks: + return None + return "\n".join(fire_blocks) + + +def _build_prompt(context_block: str, *, max_chars: int) -> str: + return ( + f"You are a wildfire radio dispatcher writing a single-message " + f"summary for mesh-radio operators in Idaho. You have data on " + f"{context_block.count(chr(10)) + 1} active fires. " + f"Write ONE message of <= {max_chars} characters that names the " + f"top fires, includes any movement direction/speed, and notes " + f"any spotting or possible new fires. Be terse: this is " + f"bandwidth-constrained mesh radio. No markdown, no bullet " + f"points, no greeting, no sign-off. Plain text only.\n\n" + f"DATA:\n{context_block}" + ) + + +def _terse_fallback(context_block: str, *, max_chars: int) -> str: + """Used when the LLM call fails or no LLM is configured.""" + lines = context_block.splitlines() + fires_n = len(lines) + head = f"Fires today ({fires_n}): " + body_parts: list[str] = [] + for line in lines[:3]: + # line shape: "- ( ac, % contained, anchor); ..." + if line.startswith("- "): + line = line[2:] + # Trim to " ac" + head_part = line.split("(", 1) + if len(head_part) == 2: + name = head_part[0].strip() + rest = head_part[1].split(",", 1)[0] + body_parts.append(f"{name} {rest}") + else: + body_parts.append(line[:40]) + body = "; ".join(body_parts) + if fires_n > 3: + body += f"; +{fires_n - 3} more" + out = head + body + return out[:max_chars] + + +# =========================================================================== +# Broadcast +# =========================================================================== + + +def _record_slot_attempt(slot_epoch_s: int, *, + sent_at: int, + summary: Optional[str], + source: str) -> Optional[int]: + """Insert into fire_digest_broadcasts. Returns rowid on insert, None + if the slot was already broadcast (UNIQUE PK collision).""" + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return None + cur = conn.execute( + "INSERT OR IGNORE INTO fire_digest_broadcasts(slot_epoch, " + "sent_at, summary, source) VALUES (?,?,?,?)", + (slot_epoch_s, sent_at, summary, source), + ) + return int(cur.lastrowid) if cur.rowcount > 0 else None + + +async def _llm_render(prompt: str, llm_backend, *, max_chars: int) -> Optional[str]: + """Call the LLM backend and return the trimmed/cleaned wire string.""" + if llm_backend is None: + return None + try: + text = await llm_backend.generate( + messages=[{"role": "user", "content": prompt}], + system_prompt="", + max_tokens=512, + ) + except Exception: + logger.exception("fire_digest: LLM call failed") + return None + if not text: + return None + # Strip markdown the LLM may have added even though prompted not to. + try: + from meshai.chunker import strip_markdown + text = strip_markdown(text) + except Exception: + pass + # Replace newlines with spaces (a digest is a single-line wire). + text = " ".join(text.split()) + return text[:max_chars] + + +async def render_digest(*, llm_backend, now: Optional[int] = None, + max_chars: Optional[int] = None) -> tuple[str, str]: + """Build the digest wire string. Returns (wire, source). source is + 'llm' on success, 'fallback_terse' on LLM failure, 'no_fires' if + there are no active fires (wire is empty in that case).""" + now = now if now is not None else int(time.time()) + cap = (max_chars if max_chars is not None + else int(adapter_config.fires.digest_max_chars)) + ctx = _gather_fire_context(now) + if ctx is None: + return "", "no_fires" + prompt = _build_prompt(ctx, max_chars=cap) + wire = await _llm_render(prompt, llm_backend, max_chars=cap) + if wire: + return wire, "llm" + return _terse_fallback(ctx, max_chars=cap), "fallback_terse" + + +# =========================================================================== +# Scheduler +# =========================================================================== + + +class FireDigestScheduler: + """Fires fire-digest broadcasts at configured local times.""" + + def __init__(self, dispatcher, llm_backend, *, + clock: Optional[Callable[[], float]] = None, + sleep: Optional[Callable[[float], Any]] = None): + self._dispatcher = dispatcher + self._llm = llm_backend + self._clock = clock or time.time + self._sleep = sleep or asyncio.sleep + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + self._logger = logger + + def _enabled(self) -> bool: + try: + return bool(adapter_config.fires.digest_enabled) + except Exception: + return False + + def _schedule(self) -> list[str]: + try: + sched = adapter_config.fires.digest_schedule + except Exception: + sched = ["06:00", "18:00"] + if not isinstance(sched, list): + sched = ["06:00", "18:00"] + return [s for s in sched if isinstance(s, str) and ":" in s] + + def _tz_name(self) -> str: + try: + return str(adapter_config.fires.digest_timezone) + except Exception: + return "America/Boise" + + async def start(self) -> None: + if self._task is not None and not self._task.done(): + raise RuntimeError("FireDigestScheduler already running") + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._run(), + name="fire-digest-scheduler") + self._logger.info( + "Fire digest scheduler started: enabled=%s schedule=%s tz=%s", + self._enabled(), self._schedule(), self._tz_name()) + + async def stop(self) -> None: + if self._stop_event: + self._stop_event.set() + if self._task: + await self._task + + async def _run(self) -> None: + while not (self._stop_event and self._stop_event.is_set()): + if not self._enabled(): + await self._sleep(60); continue + now = self._clock() + now_dt = datetime.fromtimestamp(now, tz=timezone.utc) + target_epoch, target_hh_mm = self._next_slot(now_dt) + wait_s = max(1, target_epoch - int(now)) + try: + await self._sleep(min(wait_s, 3600)) + except asyncio.CancelledError: + break + now2 = int(self._clock()) + if now2 >= target_epoch: + await self.fire_slot(target_epoch, target_hh_mm) + + def _next_slot(self, now_dt: datetime) -> tuple[int, str]: + schedule = sorted(set(self._schedule())) + if not schedule: + tomorrow = now_dt + timedelta(days=1) + return _slot_epoch(tomorrow, "12:00", self._tz_name()), "12:00" + today_now = int(now_dt.timestamp()) + for hh_mm in schedule: + ep = _slot_epoch(now_dt, hh_mm, self._tz_name()) + if ep > today_now: + return ep, hh_mm + tomorrow = now_dt + timedelta(days=1) + return _slot_epoch(tomorrow, schedule[0], self._tz_name()), schedule[0] + + async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool: + """Build + broadcast for the given slot. Returns True on broadcast.""" + wire, source = await render_digest(llm_backend=self._llm, + now=int(self._clock())) + if source == "no_fires": + self._logger.info( + "fire-digest: silent skip for %s (no active fires)", hh_mm) + _record_slot_attempt(slot_epoch_s, + sent_at=int(self._clock()), + summary=None, + source="skipped_no_fires") + return False + bcast_id = _record_slot_attempt(slot_epoch_s, + sent_at=int(self._clock()), + summary=wire, + source=source) + if bcast_id is None: + self._logger.info( + "fire-digest: slot %s already broadcast; skipping dup", + hh_mm) + return False + try: + success = await self._dispatcher.dispatch_scheduled_broadcast( + text=wire, + source_event_table="fire_digest_broadcasts", + source_event_pk=str(bcast_id), + ) + except Exception: + self._logger.exception( + "fire-digest: dispatcher raised; row stays in table") + success = False + return bool(success) diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index 3109932..e34b633 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 15 +SCHEMA_VERSION = 16 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v16.sql b/meshai/persistence/migrations/v16.sql new file mode 100644 index 0000000..04f8f83 --- /dev/null +++ b/meshai/persistence/migrations/v16.sql @@ -0,0 +1,17 @@ +-- v0.7-fire-tracker-4 -- daily fire digest broadcast tracking. +-- +-- Phase 4 feature. The digest scheduler fires N broadcasts/day for the +-- mesh ("Fires today: Cache Peak +200 ac NE; Twin Peaks stable"). Each +-- slot is uniquely identified by its UNIX-epoch wall-clock time; the +-- INSERT OR IGNORE pattern (used by band_conditions_broadcasts) keeps +-- a container restart from double-firing the same slot. + +CREATE TABLE IF NOT EXISTS fire_digest_broadcasts ( + slot_epoch INTEGER PRIMARY KEY, -- target slot in UNIX epoch seconds + sent_at INTEGER NOT NULL, -- when we actually broadcast + summary TEXT, -- the LLM-rendered wire (audit) + source TEXT NOT NULL -- 'llm' | 'fallback_terse' | 'skipped_no_fires' +); + +CREATE INDEX IF NOT EXISTS idx_fire_digest_sent_at + ON fire_digest_broadcasts(sent_at); diff --git a/meshai/router.py b/meshai/router.py index 8a580ef..5990569 100644 --- a/meshai/router.py +++ b/meshai/router.py @@ -373,6 +373,16 @@ class MessageRouter: if not query: return RouteResult(RouteType.IGNORE) + # v0.7-fire-tracker-4: ?status intent. + # Matches the leading "?status" sigil or a bare "status "; + # falls through to the normal LLM path on no match. We do the + # fire lookup here but return RouteType.LLM with a synthesized + # query so generate_llm_response runs the normal injection + + # chunking path with the fire's context attached. + status_query = _maybe_rewrite_status_query(query, self) + if status_query is not None: + return RouteResult(RouteType.LLM, query=status_query) + # Route to LLM return RouteResult(RouteType.LLM, query=query) @@ -682,7 +692,9 @@ class MessageRouter: cmd_lines.append("") cmd_lines.append( "CRITICAL: ONLY mention commands in the list above when asked about commands. " - "If a command is not listed here, it does NOT exist. Do not invent commands." + "If a command is not listed here, it does NOT exist. Do not invent commands. " + "If no command list appears above, you have NO commands -- say so plainly " + "instead of guessing names." ) system_prompt += "\n".join(cmd_lines) @@ -739,6 +751,26 @@ class MessageRouter: should_inject_mesh = is_direct_mesh_question or is_followup + # v0.7-fire-tracker-4: scope detection hoisted above its first + # use. Pre-fix, the env_reporter check below referenced scope_type + # while the assignment lived ~15 lines later inside the + # source_manager branch -- UnboundLocalError on every env query + # ("are there any fires?", "what's the weather?", etc.), the + # exception got caught in main.py and the bot went silent. + scope_type: str = "mesh" + scope_value = None + if should_inject_mesh: + scope_type, scope_value = self._detect_mesh_scope(query) + # For follow-ups with no detected scope, use previous scope. + if is_followup and scope_type == "mesh" and scope_value is None: + prev_scope = user_ctx.get("last_scope", ("mesh", None)) + if prev_scope[0] != "mesh" or prev_scope[1] is not None: + scope_type, scope_value = prev_scope + logger.debug( + f"Using previous scope for follow-up: " + f"{scope_type}, {scope_value}" + ) + # v0.6-5 env_reporter: when scope is "env" OR when injecting mesh # context, append the env_reporter blocks. The reporter itself gates # per-adapter via adapter_meta.include_in_llm_context. @@ -757,15 +789,8 @@ class MessageRouter: logger.exception("env_reporter injection failed") if self.source_manager and self.mesh_reporter and should_inject_mesh: - # Detect scope from current message - scope_type, scope_value = self._detect_mesh_scope(query) - - # For follow-ups with no detected scope, use previous scope - if is_followup and scope_type == "mesh" and scope_value is None: - prev_scope = user_ctx.get("last_scope", ("mesh", None)) - if prev_scope[0] != "mesh" or prev_scope[1] is not None: - scope_type, scope_value = prev_scope - logger.debug(f"Using previous scope for follow-up: {scope_type}, {scope_value}") + # v0.7-fire-tracker-4: scope already detected above; no + # second call needed. # Always include Tier 1 summary for mesh questions tier1 = self.mesh_reporter.build_tier1_summary() @@ -933,3 +958,145 @@ class MessageRouter: history=self.history, ) + + + +# ============================================================================ +# v0.7-fire-tracker-4: ?status intent helper +# ============================================================================ + + +_STATUS_PREFIXES = ("?status ", "status ", "?status:", "status:") + + +def _maybe_rewrite_status_query(query: str, router) -> "Optional[str]": + """If `query` looks like a fire status request, rewrite it with the + fire's persisted context inlined. Return None to let the normal LLM + path handle the message verbatim. + + Triggers on the leading word patterns in _STATUS_PREFIXES OR an + interrogative referencing a known fire (e.g. "how is the X fire?"). + """ + q = query.strip() + ql = q.lower() + target_phrase = None + for prefix in _STATUS_PREFIXES: + if ql.startswith(prefix): + target_phrase = q[len(prefix):].strip() + break + + if target_phrase is None: + # Heuristic for "how is fire?" style without a sigil. + triggers = ("how is ", "tell me about ", "status of ", + "what about ", "any update on ") + for t in triggers: + if ql.startswith(t): + target_phrase = q[len(t):].rstrip("?!. ").strip() + if "fire" in target_phrase.lower(): + break + target_phrase = None + if target_phrase is None: + return None + + if not target_phrase: + return None + + fire = _lookup_fire_fuzzy(target_phrase) + if fire is None: + # No match -- leave the query alone; the LLM with env_reporter + # injection may still answer reasonably. + return None + + context = _build_fire_status_context(fire) + return ( + f"User asked for the status of {fire['incident_name']}. " + f"Reply with ONE short paragraph (<= 300 chars total) for mesh " + f"radio operators. No markdown.\n\n" + f"FIRE DATA:\n{context}\n\n" + f"Original question: {query}" + ) + + +def _lookup_fire_fuzzy(phrase: str): + """Find a fire whose incident_name fuzzy-matches phrase. Returns the + sqlite3.Row or None. + + Match priority: exact (case-insensitive) -> startswith -> + contains -> word-overlap. Active fires (tombstoned_at IS NULL) + rank above closed ones.""" + from meshai.persistence import get_db + conn = get_db() + phrase_l = phrase.lower().strip().rstrip("?!.").rstrip() + # Drop trailing " fire" so "cache peak fire" matches "Cache Peak". + if phrase_l.endswith(" fire"): + phrase_l = phrase_l[:-5].strip() + + candidates = conn.execute( + "SELECT irwin_id, incident_name, current_acres, " + "current_contained_pct, state, county, " + "tombstoned_at, last_pass_at " + "FROM fires " + "ORDER BY (tombstoned_at IS NULL) DESC, " + "COALESCE(current_acres, 0) DESC", + ).fetchall() + if not candidates: + return None + + # Tier 1: exact match. + for c in candidates: + if (c["incident_name"] or "").lower() == phrase_l: + return c + # Tier 2: startswith. + for c in candidates: + if (c["incident_name"] or "").lower().startswith(phrase_l): + return c + # Tier 3: contains. + for c in candidates: + if phrase_l in (c["incident_name"] or "").lower(): + return c + # Tier 4: word-overlap (>= 1 token). + tokens = set(phrase_l.split()) + if tokens: + best = None + best_overlap = 0 + for c in candidates: + name_tokens = set((c["incident_name"] or "").lower().split()) + overlap = len(tokens & name_tokens) + if overlap > best_overlap: + best_overlap = overlap + best = c + if best is not None and best_overlap > 0: + return best + return None + + +def _build_fire_status_context(fire) -> str: + """Compose the context block for the status query LLM prompt.""" + from meshai.persistence import get_db + conn = get_db() + passes = conn.execute( + "SELECT pass_id, drift_mi_from_prev, drift_direction, " + "drift_mi_per_hour, pixel_count, pass_ended_at " + "FROM fire_passes WHERE irwin_id=? " + "ORDER BY pass_ended_at DESC LIMIT 3", + (fire["irwin_id"],), + ).fetchall() + lines = [ + f"name: {fire['incident_name']}", + f"acres: {fire['current_acres'] or 0}", + f"contained: {fire['current_contained_pct'] or 0}%", + f"county/state: {fire['county'] or '?'}/{fire['state'] or '?'}", + f"closed: {bool(fire['tombstoned_at'])}", + ] + if passes: + lines.append("recent passes (newest first):") + for p in passes: + drift = "" + if (p["drift_mi_from_prev"] is not None + and p["drift_direction"] is not None): + drift = (f", drift {p['drift_mi_from_prev']:.1f}mi " + f"{p['drift_direction']}") + lines.append( + f" - pass {p['pass_id']}: {p['pixel_count']} pixel(s)" + f"{drift}") + return "\n".join(lines) diff --git a/tests/test_fire_tracker_phase4.py b/tests/test_fire_tracker_phase4.py new file mode 100644 index 0000000..2af1639 --- /dev/null +++ b/tests/test_fire_tracker_phase4.py @@ -0,0 +1,187 @@ +"""v0.7-fire-tracker-4 tests.""" +from __future__ import annotations + +import asyncio +import time +import uuid + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_db(tmp_path, monkeypatch): + db_path = str(tmp_path / f"meshai-{uuid.uuid4().hex}.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + from meshai.persistence import db as pdb + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + from meshai.persistence import init_db + init_db(db_path) + yield db_path + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + + +def _seed_fire(*, irwin_id, name, lat, lon, acres=None, + contained=None, county="Test", state="ID"): + from meshai.persistence import get_db + get_db().execute( + "INSERT INTO fires(irwin_id, incident_name, current_acres, " + "current_contained_pct, lat, lon, county, state, last_event_at) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (irwin_id, name, acres, contained, lat, lon, county, state, + int(time.time())), + ) + + +# =========================================================================== +# Bug A regression: scope_type defined before use +# =========================================================================== + + +def test_router_scope_type_defined_before_env_check(): + """The env_reporter check at the top of generate_llm_response reads + scope_type. Pre-fix it was UnboundLocalError on every env query.""" + import re + from pathlib import Path + src = Path("/opt/meshai/meshai/router.py").read_text() + # Find the "if should_inject_mesh and scope_type" line + the + # nearest preceding `scope_type, scope_value = ` assignment. + env_use_line = None + for i, line in enumerate(src.splitlines(), start=1): + if "should_inject_mesh and scope_type" in line: + env_use_line = i + break + assert env_use_line is not None + # There must be an assignment on or before this line. + preceding = "\n".join(src.splitlines()[: env_use_line - 1]) + assert re.search(r"scope_type[, ]+scope_value\s*=", preceding) \ + or "scope_type:" in preceding + + +# =========================================================================== +# adapter_config seed + categories registration +# =========================================================================== + + +def test_adapter_config_seeds_digest_keys(): + from meshai.persistence import get_db + rows = { + (r["adapter"], r["key"]): r["default_json"] + for r in get_db().execute( + "SELECT adapter, key, default_json FROM adapter_config " + "WHERE adapter='fires' AND key LIKE 'digest%'" + ) + } + assert rows[("fires", "digest_enabled")] == "true" + assert rows[("fires", "digest_schedule")] == '["06:00", "18:00"]' + assert rows[("fires", "digest_timezone")] == '"America/Boise"' + assert rows[("fires", "digest_max_chars")] == "200" + + +# =========================================================================== +# Digest renderer +# =========================================================================== + + +def test_render_digest_returns_no_fires_when_table_empty(): + from meshai.notifications.scheduled.fire_digest import render_digest + + async def _run(): + return await render_digest(llm_backend=None, max_chars=200) + wire, source = asyncio.run(_run()) + assert wire == "" + assert source == "no_fires" + + +def test_render_digest_terse_fallback_when_no_llm(): + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847, contained=23) + _seed_fire(irwin_id="ID-B", name="Twin Peaks", + lat=43.0, lon=-115.0, acres=320, contained=5) + from meshai.notifications.scheduled.fire_digest import render_digest + + async def _run(): + return await render_digest(llm_backend=None, max_chars=200) + wire, source = asyncio.run(_run()) + assert source == "fallback_terse" + assert wire + assert "Cache Peak" in wire + assert len(wire) <= 200 + + +def test_render_digest_uses_llm_when_available(): + """When the LLM backend returns a string, that string IS the wire.""" + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847) + + class StubLLM: + async def generate(self, *, messages, system_prompt, max_tokens): + # The renderer must give us a single-line wire derived from + # the LLM output, with markdown stripped + cap applied. + return "Cache Peak 1847 ac stable; no spotting today." + + from meshai.notifications.scheduled.fire_digest import render_digest + + async def _run(): + return await render_digest(llm_backend=StubLLM(), max_chars=200) + wire, source = asyncio.run(_run()) + assert source == "llm" + assert wire == "Cache Peak 1847 ac stable; no spotting today." + + +# =========================================================================== +# Fuzzy fire lookup for ?status +# =========================================================================== + + +def test_status_lookup_exact_name(): + from meshai.router import _lookup_fire_fuzzy + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847) + f = _lookup_fire_fuzzy("Cache Peak") + assert f is not None + assert f["incident_name"] == "Cache Peak" + + +def test_status_lookup_trims_trailing_fire_word(): + from meshai.router import _lookup_fire_fuzzy + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847) + f = _lookup_fire_fuzzy("cache peak fire") + assert f is not None + assert f["incident_name"] == "Cache Peak" + + +def test_status_lookup_word_overlap_fallback(): + from meshai.router import _lookup_fire_fuzzy + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847) + f = _lookup_fire_fuzzy("how is peak doing") + assert f is not None + assert f["incident_name"] == "Cache Peak" + + +def test_status_lookup_returns_none_on_no_match(): + from meshai.router import _lookup_fire_fuzzy + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847) + assert _lookup_fire_fuzzy("nonexistent ranger station") is None + + +def test_status_query_rewrite_includes_fire_context(): + from meshai.router import _maybe_rewrite_status_query + _seed_fire(irwin_id="ID-A", name="Cache Peak", + lat=42.0, lon=-114.0, acres=1847, contained=23) + out = _maybe_rewrite_status_query("?status Cache Peak", router=None) + assert out is not None + assert "Cache Peak" in out + assert "1847" in out + # Must instruct the LLM to be terse mesh format. + assert "mesh" in out.lower() + + +def test_status_query_rewrite_returns_none_when_not_status(): + from meshai.router import _maybe_rewrite_status_query + out = _maybe_rewrite_status_query("how's the weather?", router=None) + assert out is None