From 493b43f7cf0e9374b700f22ed9dfbc61f0920f43 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 22:32:51 +0000 Subject: [PATCH] feat(notifications): Phase 2.3b digest scheduler Adds DigestScheduler class that fires digest at configured time (default 07:00) and routes to rules with trigger_type=schedule and schedule_match=digest. - DigestScheduler: asyncio task with start/stop lifecycle - Config: DigestConfig dataclass with schedule and include fields - Config: schedule_match field on NotificationRuleConfig - Pipeline: start_pipeline/stop_pipeline async lifecycle functions - Mesh channels get per-chunk delivery, email/webhook get full text - 26 new tests covering schedule computation, fire behavior, lifecycle Co-Authored-By: Claude Opus 4.5 --- config.example.yaml | 680 ++++----- meshai/config.py | 1510 ++++++++++---------- meshai/notifications/pipeline/__init__.py | 90 +- meshai/notifications/pipeline/scheduler.py | 213 +++ tests/test_pipeline_scheduler.py | 587 ++++++++ 5 files changed, 1998 insertions(+), 1082 deletions(-) create mode 100644 meshai/notifications/pipeline/scheduler.py create mode 100644 tests/test_pipeline_scheduler.py diff --git a/config.example.yaml b/config.example.yaml index da73362..7951523 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,328 +1,352 @@ -# MeshAI Configuration -# LLM-powered Meshtastic assistant -# -# Copy this to config.yaml and customize as needed -# For Docker: mount as /data/config.yaml - -# === BOT IDENTITY === -bot: - name: ai # Bot's display name - owner: "" # Owner's callsign (optional) - respond_to_dms: true # Respond to direct messages - filter_bbs_protocols: true # Ignore advBBS sync/notification messages - -# === MESHTASTIC CONNECTION === -connection: - type: tcp # serial | tcp - serial_port: /dev/ttyUSB0 # For serial connection - tcp_host: localhost # For TCP connection (meshtasticd) - tcp_port: 4403 - -# === RESPONSE BEHAVIOR === -response: - delay_min: 2.2 # Min delay before responding (seconds) - delay_max: 3.0 # Max delay before responding - max_length: 200 # Max chars per message chunk - max_messages: 3 # Max message chunks per response - -# === CONVERSATION HISTORY === -history: - database: /data/conversations.db - max_messages_per_user: 50 # Messages to keep per user - conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h) - auto_cleanup: true # Auto-delete old conversations - cleanup_interval_hours: 24 # How often to run cleanup - max_age_days: 30 # Delete conversations older than this - -# === MEMORY OPTIMIZATION === -memory: - enabled: true # Enable rolling summary memory - window_size: 4 # Recent message pairs to keep in full - summarize_threshold: 8 # Messages before re-summarizing - -# === MESH CONTEXT === -context: - enabled: true # Observe channel traffic for LLM context - observe_channels: [] # Channel indices to observe (empty = all) - ignore_nodes: [] # Node IDs to exclude from observation - max_age: 2592000 # Max age in seconds (default 30 days) - max_context_items: 20 # Max observations injected into LLM context - -# === LLM BACKEND === -llm: - backend: openai # openai | anthropic | google - api_key: "" # API key (or use LLM_API_KEY env var) - base_url: https://api.openai.com/v1 # API base URL - model: gpt-4o-mini # Model name - timeout: 30 # Request timeout (seconds) - system_prompt: >- - You are a helpful assistant on a Meshtastic mesh network. - Keep responses very brief - 1-2 short sentences, under 300 characters. - Only give longer answers if the user explicitly asks for detail or explanation. - Be concise but friendly. No markdown formatting. - google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries) - -# === WEATHER === -weather: - primary: openmeteo # openmeteo | wttr | llm - fallback: llm # openmeteo | wttr | llm | none - default_location: "" # Default location for !weather (optional) - -# === MESHMONITOR INTEGRATION === -meshmonitor: - enabled: false # Enable MeshMonitor trigger sync - url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333) - inject_into_prompt: true # Include trigger list in LLM prompt - refresh_interval: 300 # Seconds between trigger refreshes - -# === KNOWLEDGE BASE (RAG) === -knowledge: - enabled: false # Enable knowledge base search - db_path: "" # Path to knowledge SQLite database - top_k: 5 # Number of chunks to retrieve per query - -# === MESH DATA SOURCES === -# Connect to Meshview and/or MeshMonitor instances for live mesh -# network analysis. Supports multiple sources. Configure via TUI -# with meshai --config (Mesh Sources menu). -# -# mesh_sources: -# - name: "my-meshview" -# type: meshview -# url: "https://meshview.example.com" -# refresh_interval: 300 -# enabled: true -# -# - name: "my-meshmonitor" -# type: meshmonitor -# url: "http://192.168.1.100:3333" -# api_token: "${MM_API_TOKEN}" -# refresh_interval: 300 -# enabled: true -# -# - name: "mqtt-broker" -# type: mqtt -# host: "mqtt.meshtastic.org" -# port: 1883 -# username: "meshdev" -# password: "large4cats" -# topic_root: "msh/US" -# use_tls: false -# enabled: true -mesh_sources: [] - -# === MESH INTELLIGENCE === -# Geographic clustering and health scoring for mesh analysis. -# Requires mesh_sources to be configured with at least one data source. -# -# mesh_intelligence: -# enabled: true -# region_radius_miles: 40.0 # Radius for region clustering -# locality_radius_miles: 8.0 # Radius for locality clustering -# offline_threshold_hours: 2 # Hours before node considered offline -# packet_threshold: 500 # Non-text packets per 24h to flag -# battery_warning_percent: 30 # Battery level for warnings -# infra_overrides: [] # Node IDs to exclude from infrastructure -# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"} -mesh_intelligence: - enabled: false - region_radius_miles: 40.0 - locality_radius_miles: 8.0 - offline_threshold_hours: 2 - packet_threshold: 500 - battery_warning_percent: 30 - infra_overrides: [] - region_labels: {} - -# === ENVIRONMENTAL FEEDS === -# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo. -# Provides weather alerts, HF propagation assessment, and tropospheric ducting. -# -environmental: - enabled: false - nws_zones: - - "IDZ016" # Western Magic Valley - - "IDZ030" # Southern Twin Falls County - - # NWS Weather Alerts (api.weather.gov) - nws: - enabled: true - tick_seconds: 60 - areas: ["ID"] - severity_min: "moderate" - user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS - - # NOAA Space Weather (services.swpc.noaa.gov) - swpc: - enabled: true - - # Tropospheric ducting assessment (Open-Meteo GFS, no auth) - ducting: - enabled: true - tick_seconds: 10800 # 3 hours - latitude: 42.56 # center of mesh coverage area - longitude: -114.47 - - # NIFC Fire Perimeters (Phase 2) - fires: - enabled: false - tick_seconds: 600 - state: "US-ID" - - # Avalanche Advisories (Phase 2) - avalanche: - enabled: false - tick_seconds: 1800 - center_ids: ["SNFAC"] - season_months: [12, 1, 2, 3, 4] - - # USGS Stream Gauges (waterservices.usgs.gov) - # Find site IDs at https://waterdata.usgs.gov/nwis - usgs: - enabled: false - tick_seconds: 900 # Min 15 min per USGS guidelines - sites: [] # e.g. ["13090500", "13088000"] - - # TomTom Traffic Flow (api.tomtom.com, requires API key) - traffic: - enabled: false - tick_seconds: 300 - api_key: "" # Get key at developer.tomtom.com - corridors: [] - # Example corridors: - # - name: "I-84 Twin Falls" - # lat: 42.56 - # lon: -114.47 - - # 511 Road Conditions (state-specific, configurable base URL) - roads511: - enabled: false - tick_seconds: 300 - api_key: "" - base_url: "" # e.g. "https://511.idaho.gov/api/v2" - endpoints: ["/get/event"] - bbox: [] # [west, south, east, north] - - # NASA FIRMS Satellite Fire Detection - # Early warning via satellite hotspots, hours before official perimeters - # Get MAP_KEY at: https://firms.modaps.eosdis.nasa.gov/api/area/ - firms: - enabled: false - tick_seconds: 1800 # 30 min default - map_key: "" # Required - NASA FIRMS MAP_KEY - source: "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT - bbox: [] # [west, south, east, north] - Required - day_range: 1 # 1-10 days of data - confidence_min: "nominal" # low, nominal, high - proximity_km: 10.0 # km to match known fire perimeters - - -# === NOTIFICATION DELIVERY (TRANSITIONAL) === -# NOTE: This notifications schema will be replaced in v0.3 by the 8-toggle model. -# These rule examples are transitional until Phase 1.2 lands. Do not extend. -# Severity levels: routine (informational), priority (needs attention), immediate (act now) -# -# Route alerts to channels (mesh, email, webhook) based on rules. -# Categories match alert types from alert_engine.py. -notifications: - enabled: false - quiet_hours_enabled: true # Master toggle for quiet hours feature - quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours - quiet_hours_end: "06:00" - - # Notification rules - each rule is self-contained with its own delivery config - # Default baseline rules are created on fresh install - rules: - # Emergency Broadcast - all emergencies go out immediately - - name: "Emergency Broadcast" - enabled: true - trigger_type: condition - categories: [] # Empty = all categories - min_severity: "immediate" - delivery_type: mesh_broadcast - broadcast_channel: 0 - cooldown_minutes: 5 - override_quiet: true # Send even during quiet hours - - # Infrastructure Down - critical node and infrastructure offline alerts - - name: "Infrastructure Down" - enabled: true - trigger_type: condition - categories: ["infra_offline", "critical_node_down"] - min_severity: "priority" - delivery_type: mesh_broadcast - broadcast_channel: 0 - cooldown_minutes: 30 - override_quiet: false - - # Fire Alert - wildfire proximity and new ignition - - name: "Fire Alert" - enabled: true - trigger_type: condition - categories: ["wildfire_proximity", "new_ignition"] - min_severity: "routine" - delivery_type: mesh_broadcast - broadcast_channel: 0 - cooldown_minutes: 60 - override_quiet: false - - # Severe Weather - weather warnings - - name: "Severe Weather" - enabled: true - trigger_type: condition - categories: ["weather_warning"] - min_severity: "priority" - delivery_type: mesh_broadcast - broadcast_channel: 0 - cooldown_minutes: 30 - override_quiet: false - - # Example: Fire alerts -> email - # - name: "Fire Alerts Email" - # enabled: true - # trigger_type: condition - # categories: ["wildfire_proximity", "new_ignition"] - # min_severity: "routine" - # delivery_type: email - # smtp_host: "smtp.gmail.com" - # smtp_port: 587 - # smtp_user: "you@gmail.com" - # smtp_password: "${SMTP_PASSWORD}" - # smtp_tls: true - # from_address: "meshai@yourdomain.com" - # recipients: ["admin@yourdomain.com"] - # cooldown_minutes: 30 - - # Example: All warnings -> Discord webhook - # - name: "Discord Alerts" - # enabled: true - # trigger_type: condition - # categories: [] - # min_severity: "priority" - # delivery_type: webhook - # webhook_url: "https://discord.com/api/webhooks/..." - # cooldown_minutes: 10 - - # Example: Daily health report -> mesh broadcast - # - name: "Morning Briefing" - # enabled: true - # trigger_type: schedule - # schedule_frequency: daily - # schedule_time: "07:00" - # message_type: mesh_health_summary - # delivery_type: mesh_broadcast - # broadcast_channel: 0 - - # Example: Rule with no delivery (matches and logs, but doesn't send) - # - name: "Monitor Only" - # enabled: true - # trigger_type: condition - # categories: ["battery_warning"] - # min_severity: "priority" - # delivery_type: "" # Empty = no delivery, just tracks matches - -# === WEB DASHBOARD === -dashboard: - enabled: true - port: 8080 - host: "0.0.0.0" +# MeshAI Configuration +# LLM-powered Meshtastic assistant +# +# Copy this to config.yaml and customize as needed +# For Docker: mount as /data/config.yaml + +# === BOT IDENTITY === +bot: + name: ai # Bot's display name + owner: "" # Owner's callsign (optional) + respond_to_dms: true # Respond to direct messages + filter_bbs_protocols: true # Ignore advBBS sync/notification messages + +# === MESHTASTIC CONNECTION === +connection: + type: tcp # serial | tcp + serial_port: /dev/ttyUSB0 # For serial connection + tcp_host: localhost # For TCP connection (meshtasticd) + tcp_port: 4403 + +# === RESPONSE BEHAVIOR === +response: + delay_min: 2.2 # Min delay before responding (seconds) + delay_max: 3.0 # Max delay before responding + max_length: 200 # Max chars per message chunk + max_messages: 3 # Max message chunks per response + +# === CONVERSATION HISTORY === +history: + database: /data/conversations.db + max_messages_per_user: 50 # Messages to keep per user + conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h) + auto_cleanup: true # Auto-delete old conversations + cleanup_interval_hours: 24 # How often to run cleanup + max_age_days: 30 # Delete conversations older than this + +# === MEMORY OPTIMIZATION === +memory: + enabled: true # Enable rolling summary memory + window_size: 4 # Recent message pairs to keep in full + summarize_threshold: 8 # Messages before re-summarizing + +# === MESH CONTEXT === +context: + enabled: true # Observe channel traffic for LLM context + observe_channels: [] # Channel indices to observe (empty = all) + ignore_nodes: [] # Node IDs to exclude from observation + max_age: 2592000 # Max age in seconds (default 30 days) + max_context_items: 20 # Max observations injected into LLM context + +# === LLM BACKEND === +llm: + backend: openai # openai | anthropic | google + api_key: "" # API key (or use LLM_API_KEY env var) + base_url: https://api.openai.com/v1 # API base URL + model: gpt-4o-mini # Model name + timeout: 30 # Request timeout (seconds) + system_prompt: >- + You are a helpful assistant on a Meshtastic mesh network. + Keep responses very brief - 1-2 short sentences, under 300 characters. + Only give longer answers if the user explicitly asks for detail or explanation. + Be concise but friendly. No markdown formatting. + google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries) + +# === WEATHER === +weather: + primary: openmeteo # openmeteo | wttr | llm + fallback: llm # openmeteo | wttr | llm | none + default_location: "" # Default location for !weather (optional) + +# === MESHMONITOR INTEGRATION === +meshmonitor: + enabled: false # Enable MeshMonitor trigger sync + url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333) + inject_into_prompt: true # Include trigger list in LLM prompt + refresh_interval: 300 # Seconds between trigger refreshes + +# === KNOWLEDGE BASE (RAG) === +knowledge: + enabled: false # Enable knowledge base search + db_path: "" # Path to knowledge SQLite database + top_k: 5 # Number of chunks to retrieve per query + +# === MESH DATA SOURCES === +# Connect to Meshview and/or MeshMonitor instances for live mesh +# network analysis. Supports multiple sources. Configure via TUI +# with meshai --config (Mesh Sources menu). +# +# mesh_sources: +# - name: "my-meshview" +# type: meshview +# url: "https://meshview.example.com" +# refresh_interval: 300 +# enabled: true +# +# - name: "my-meshmonitor" +# type: meshmonitor +# url: "http://192.168.1.100:3333" +# api_token: "${MM_API_TOKEN}" +# refresh_interval: 300 +# enabled: true +# +# - name: "mqtt-broker" +# type: mqtt +# host: "mqtt.meshtastic.org" +# port: 1883 +# username: "meshdev" +# password: "large4cats" +# topic_root: "msh/US" +# use_tls: false +# enabled: true +mesh_sources: [] + +# === MESH INTELLIGENCE === +# Geographic clustering and health scoring for mesh analysis. +# Requires mesh_sources to be configured with at least one data source. +# +# mesh_intelligence: +# enabled: true +# region_radius_miles: 40.0 # Radius for region clustering +# locality_radius_miles: 8.0 # Radius for locality clustering +# offline_threshold_hours: 2 # Hours before node considered offline +# packet_threshold: 500 # Non-text packets per 24h to flag +# battery_warning_percent: 30 # Battery level for warnings +# infra_overrides: [] # Node IDs to exclude from infrastructure +# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"} +mesh_intelligence: + enabled: false + region_radius_miles: 40.0 + locality_radius_miles: 8.0 + offline_threshold_hours: 2 + packet_threshold: 500 + battery_warning_percent: 30 + infra_overrides: [] + region_labels: {} + +# === ENVIRONMENTAL FEEDS === +# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo. +# Provides weather alerts, HF propagation assessment, and tropospheric ducting. +# +environmental: + enabled: false + nws_zones: + - "IDZ016" # Western Magic Valley + - "IDZ030" # Southern Twin Falls County + + # NWS Weather Alerts (api.weather.gov) + nws: + enabled: true + tick_seconds: 60 + areas: ["ID"] + severity_min: "moderate" + user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS + + # NOAA Space Weather (services.swpc.noaa.gov) + swpc: + enabled: true + + # Tropospheric ducting assessment (Open-Meteo GFS, no auth) + ducting: + enabled: true + tick_seconds: 10800 # 3 hours + latitude: 42.56 # center of mesh coverage area + longitude: -114.47 + + # NIFC Fire Perimeters (Phase 2) + fires: + enabled: false + tick_seconds: 600 + state: "US-ID" + + # Avalanche Advisories (Phase 2) + avalanche: + enabled: false + tick_seconds: 1800 + center_ids: ["SNFAC"] + season_months: [12, 1, 2, 3, 4] + + # USGS Stream Gauges (waterservices.usgs.gov) + # Find site IDs at https://waterdata.usgs.gov/nwis + usgs: + enabled: false + tick_seconds: 900 # Min 15 min per USGS guidelines + sites: [] # e.g. ["13090500", "13088000"] + + # TomTom Traffic Flow (api.tomtom.com, requires API key) + traffic: + enabled: false + tick_seconds: 300 + api_key: "" # Get key at developer.tomtom.com + corridors: [] + # Example corridors: + # - name: "I-84 Twin Falls" + # lat: 42.56 + # lon: -114.47 + + # 511 Road Conditions (state-specific, configurable base URL) + roads511: + enabled: false + tick_seconds: 300 + api_key: "" + base_url: "" # e.g. "https://511.idaho.gov/api/v2" + endpoints: ["/get/event"] + bbox: [] # [west, south, east, north] + + # NASA FIRMS Satellite Fire Detection + # Early warning via satellite hotspots, hours before official perimeters + # Get MAP_KEY at: https://firms.modaps.eosdis.nasa.gov/api/area/ + firms: + enabled: false + tick_seconds: 1800 # 30 min default + map_key: "" # Required - NASA FIRMS MAP_KEY + source: "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT + bbox: [] # [west, south, east, north] - Required + day_range: 1 # 1-10 days of data + confidence_min: "nominal" # low, nominal, high + proximity_km: 10.0 # km to match known fire perimeters + + +# === NOTIFICATION DELIVERY (TRANSITIONAL) === +# NOTE: This notifications schema will be replaced in v0.3 by the 8-toggle model. +# These rule examples are transitional until Phase 1.2 lands. Do not extend. +# Severity levels: routine (informational), priority (needs attention), immediate (act now) +# +# Route alerts to channels (mesh, email, webhook) based on rules. +# Categories match alert types from alert_engine.py. +notifications: + enabled: false + quiet_hours_enabled: true # Master toggle for quiet hours feature + quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours + quiet_hours_end: "06:00" + + # Digest scheduler settings + # The digest collects priority/routine events and delivers a summary + # at the configured time to rules with trigger_type='schedule' and + # schedule_match='digest'. + digest: + schedule: "07:00" # HH:MM local time to fire digest + include: [] # Toggle names to include (empty = default set) + # Default set: weather, fire, seismic, avalanche, roads, mesh_health, tracking, other + # Excludes rf_propagation by default + # Example: include: ["weather", "fire", "mesh_health"] + + # Notification rules - each rule is self-contained with its own delivery config + # Default baseline rules are created on fresh install + rules: + # Emergency Broadcast - all emergencies go out immediately + - name: "Emergency Broadcast" + enabled: true + trigger_type: condition + categories: [] # Empty = all categories + min_severity: "immediate" + delivery_type: mesh_broadcast + broadcast_channel: 0 + cooldown_minutes: 5 + override_quiet: true # Send even during quiet hours + + # Infrastructure Down - critical node and infrastructure offline alerts + - name: "Infrastructure Down" + enabled: true + trigger_type: condition + categories: ["infra_offline", "critical_node_down"] + min_severity: "priority" + delivery_type: mesh_broadcast + broadcast_channel: 0 + cooldown_minutes: 30 + override_quiet: false + + # Fire Alert - wildfire proximity and new ignition + - name: "Fire Alert" + enabled: true + trigger_type: condition + categories: ["wildfire_proximity", "new_ignition"] + min_severity: "routine" + delivery_type: mesh_broadcast + broadcast_channel: 0 + cooldown_minutes: 60 + override_quiet: false + + # Severe Weather - weather warnings + - name: "Severe Weather" + enabled: true + trigger_type: condition + categories: ["weather_warning"] + min_severity: "priority" + delivery_type: mesh_broadcast + broadcast_channel: 0 + cooldown_minutes: 30 + override_quiet: false + + # Example: Morning Digest -> mesh broadcast + # Delivers the accumulated digest at the configured schedule time + # - name: "Morning Digest Mesh" + # enabled: false + # trigger_type: schedule + # schedule_match: "digest" # Required for digest delivery + # delivery_type: mesh_broadcast + # broadcast_channel: 0 + + # Example: Morning Digest -> email + # - name: "Morning Digest Email" + # enabled: false + # trigger_type: schedule + # schedule_match: "digest" + # delivery_type: email + # smtp_host: "smtp.gmail.com" + # smtp_port: 587 + # smtp_user: "you@gmail.com" + # smtp_password: "${SMTP_PASSWORD}" + # smtp_tls: true + # from_address: "meshai@yourdomain.com" + # recipients: ["admin@yourdomain.com"] + + # Example: Fire alerts -> email + # - name: "Fire Alerts Email" + # enabled: true + # trigger_type: condition + # categories: ["wildfire_proximity", "new_ignition"] + # min_severity: "routine" + # delivery_type: email + # smtp_host: "smtp.gmail.com" + # smtp_port: 587 + # smtp_user: "you@gmail.com" + # smtp_password: "${SMTP_PASSWORD}" + # smtp_tls: true + # from_address: "meshai@yourdomain.com" + # recipients: ["admin@yourdomain.com"] + # cooldown_minutes: 30 + + # Example: All warnings -> Discord webhook + # - name: "Discord Alerts" + # enabled: true + # trigger_type: condition + # categories: [] + # min_severity: "priority" + # delivery_type: webhook + # webhook_url: "https://discord.com/api/webhooks/..." + # cooldown_minutes: 10 + + # Example: Rule with no delivery (matches and logs, but doesn't send) + # - name: "Monitor Only" + # enabled: true + # trigger_type: condition + # categories: ["battery_warning"] + # min_severity: "priority" + # delivery_type: "" # Empty = no delivery, just tracks matches + +# === WEB DASHBOARD === +dashboard: + enabled: true + port: 8080 + host: "0.0.0.0" diff --git a/meshai/config.py b/meshai/config.py index 441f3dc..6651f20 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -1,749 +1,761 @@ -"""Configuration management for MeshAI.""" - -import logging -import os -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional - -import yaml - -_config_logger = logging.getLogger(__name__) - - -@dataclass -class BotConfig: - """Bot identity and trigger settings.""" - - name: str = "ai" - owner: str = "" - respond_to_dms: bool = True - filter_bbs_protocols: bool = True - - -@dataclass -class ConnectionConfig: - """Meshtastic connection settings.""" - - type: str = "serial" # serial or tcp - serial_port: str = "/dev/ttyUSB0" - tcp_host: str = "192.168.1.100" - tcp_port: int = 4403 - - -@dataclass -class ResponseConfig: - """Response behavior settings.""" - - delay_min: float = 1.5 - delay_max: float = 2.5 - max_length: int = 200 - max_messages: int = 3 - - -@dataclass -class HistoryConfig: - """Conversation history settings.""" - - database: str = "conversations.db" - max_messages_per_user: int = 50 - conversation_timeout: int = 86400 # 24 hours - - # Cleanup settings - auto_cleanup: bool = True - cleanup_interval_hours: int = 24 - max_age_days: int = 30 # Delete conversations older than this - - -@dataclass -class MemoryConfig: - """Rolling summary memory settings.""" - - enabled: bool = True # Enable memory optimization - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - window_size: int = 4 # Recent message pairs to keep in full - summarize_threshold: int = 8 # Messages before re-summarizing - - -@dataclass -class ContextConfig: - """Passive mesh context settings.""" - - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - observe_channels: list[int] = field(default_factory=list) # Empty = all channels - ignore_nodes: list[str] = field(default_factory=list) # Node IDs to ignore - max_age: int = 2_592_000 # 30 days in seconds - max_context_items: int = 20 # Max observations injected into LLM context - - -@dataclass -class CommandsConfig: - """Command settings.""" - - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - prefix: str = "!" - disabled_commands: list[str] = field(default_factory=list) - custom_commands: dict = field(default_factory=dict) - - -@dataclass -class LLMConfig: - """LLM backend settings.""" - - backend: str = "openai" # openai, anthropic, google - api_key: str = "" - base_url: str = "https://api.openai.com/v1" - model: str = "gpt-4o-mini" - timeout: int = 30 - max_response_tokens: int = 8192 # Let LLM generate full responses; chunker handles size - - system_prompt: str = ( - "RESPONSE RULES:\n" - "- For casual conversation, keep responses brief (1-2 sentences).\n" - "- For mesh health questions, give detailed data-driven responses.\n" - "- Be concise but friendly. No markdown formatting.\n" - "- If asked about mesh activity and no recent traffic is shown, say you haven't " - "observed any yet.\n" - "- When asked about yourself or commands, answer conversationally based on " - "the command list provided below. Don't dump lists unless asked.\n" - "- You are part of the freq51 mesh.\n" - "- When asked about yourself or commands, answer conversationally. Don't dump lists.\n" - "- You are part of the freq51 mesh in the Twin Falls, Idaho area.\n" - "- NEVER use markdown formatting (no bold, no asterisks, no bullet points, no numbered lists). Plain text only.\n" - "- NEVER say 'Want me to keep going?' -- the system handles continuation prompts automatically." - ) - use_system_prompt: bool = True # Toggle to disable sending system prompt - web_search: bool = False # Enable web search (Open WebUI feature) - google_grounding: bool = False # Enable Google Search grounding (Gemini only) - - -@dataclass -class OpenMeteoConfig: - """Open-Meteo weather provider settings.""" - - url: str = "https://api.open-meteo.com/v1" - - -@dataclass -class WttrConfig: - """wttr.in weather provider settings.""" - - url: str = "https://wttr.in" - - -@dataclass -class WeatherConfig: - """Weather command settings.""" - - primary: str = "openmeteo" # openmeteo, wttr, llm - fallback: str = "llm" # openmeteo, wttr, llm, none - default_location: str = "" - openmeteo: OpenMeteoConfig = field(default_factory=OpenMeteoConfig) - wttr: WttrConfig = field(default_factory=WttrConfig) - - -@dataclass -class MeshMonitorConfig: - """MeshMonitor trigger sync settings.""" - - enabled: bool = False - url: str = "" # e.g., http://100.64.0.11:3333 - inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands - refresh_interval: int = 30 # Tick interval in seconds (default 30) - polite_mode: bool = False # Reduces polling frequency for shared instances # Seconds between refreshes - - -@dataclass -class KnowledgeConfig: - """Knowledge base settings.""" - - enabled: bool = False - backend: str = "auto" # "qdrant", "sqlite", or "auto" (try qdrant, fall back to sqlite) - - # Qdrant / RECON settings - qdrant_host: str = "" # e.g., "192.168.1.150" - qdrant_port: int = 6333 - qdrant_collection: str = "recon_knowledge_hybrid" - tei_host: str = "" # TEI embedding service host - tei_port: int = 8090 - sparse_host: str = "" # Sparse embedding service host - sparse_port: int = 8091 - use_sparse: bool = True # Enable hybrid dense+sparse search - - # SQLite fallback settings - db_path: str = "" - top_k: int = 5 - - -@dataclass -class MeshSourceConfig: - """Configuration for a mesh data source.""" - - name: str = "" - type: str = "" # "meshview", "meshmonitor", or "mqtt" - url: str = "" - api_token: str = "" # MeshMonitor only, supports ${ENV_VAR} - refresh_interval: int = 30 # Tick interval in seconds (default 30) - polite_mode: bool = False # Reduces polling frequency for shared instances - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - - -@dataclass -class RegionAnchor: - """A fixed region anchor point with geographic context.""" - - name: str = "" - lat: float = 0.0 - lon: float = 0.0 - local_name: str = "" # e.g., "Magic Valley" - description: str = "" # e.g., "Twin Falls, Burley, Jerome along I-84/US-93" - aliases: list[str] = field(default_factory=list) # e.g., ["southern Idaho", "magic valley"] - cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"] - nws_zones: list[str] = field(default_factory=list) # NWS zone codes (e.g., ["IDZ016", "IDZ030"]) - - -@dataclass -class AlertRulesConfig: - """Per-condition alert toggles and thresholds.""" - - # Infrastructure - infra_offline: bool = True - infra_recovery: bool = True - new_router: bool = True - - # Power - battery_trend_declining: bool = True - battery_warning: bool = True - battery_critical: bool = True - battery_emergency: bool = True - battery_warning_threshold: int = 30 - battery_critical_threshold: int = 15 - battery_emergency_threshold: int = 5 - # Voltage-based thresholds (more accurate than percentage) - battery_warning_voltage: float = 3.60 - battery_critical_voltage: float = 3.50 - battery_emergency_voltage: float = 3.40 - power_source_change: bool = True - solar_not_charging: bool = True - - # Utilization - sustained_high_util: bool = True - high_util_threshold: float = 40.0 - high_util_hours: int = 6 - packet_flood: bool = True - packet_flood_threshold: int = 10 - - # Coverage - infra_single_gateway: bool = True - feeder_offline: bool = True - region_total_blackout: bool = True - - # Health Scores - mesh_score_alert: bool = True - mesh_score_threshold: int = 65 - region_score_alert: bool = True - region_score_threshold: int = 60 - - -@dataclass -class MeshIntelligenceConfig: - """Mesh intelligence and health scoring settings.""" - - enabled: bool = False - regions: list[RegionAnchor] = field(default_factory=list) # Fixed region anchors - locality_radius_miles: float = 8.0 # Radius for locality clustering within regions - offline_threshold_hours: int = 2 # Hours before node considered offline - packet_threshold: int = 500 # Non-text packets per 24h to flag - # TODO: behavior pillar uses wrong scale - see meshai-v03-notification-handoff.md bug #2 - battery_warning_percent: int = 30 # Battery level for warnings - - # Alert settings - critical_nodes: list[str] = field(default_factory=list) # Short names of critical nodes (e.g., ["MHR", "HPR"]) - alert_channel: int = -1 # Channel to broadcast alerts on. -1 = disabled, 0+ = channel index - alert_cooldown_minutes: int = 30 # Min minutes between repeated alerts for same condition - alert_rules: AlertRulesConfig = field(default_factory=AlertRulesConfig) - - -# Environmental feed configs -@dataclass -class NWSConfig: - """NWS weather alerts settings.""" - - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - tick_seconds: int = 60 - areas: list = field(default_factory=lambda: ["ID"]) - severity_min: str = "moderate" - user_agent: str = "" - - -@dataclass -class SWPCConfig: - """NOAA Space Weather settings.""" - - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - - -@dataclass -class DuctingConfig: - """Tropospheric ducting settings.""" - - enabled: bool = True - - # MQTT-specific fields (type=mqtt only) - host: str = "" # MQTT broker hostname - port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) - username: str = "" # MQTT username (optional) - password: str = "" # MQTT password (optional, supports ) - topic_root: str = "msh/US" # Topic root to subscribe to - use_tls: bool = False # Enable TLS for MQTT connection - tick_seconds: int = 10800 # 3 hours - latitude: float = 42.56 # Twin Falls area default - longitude: float = -114.47 - - -@dataclass -class NICFFiresConfig: - """NIFC fire perimeters settings (Phase 2).""" - - enabled: bool = False - tick_seconds: int = 600 - state: str = "US-ID" - - -@dataclass -class AvalancheConfig: - """Avalanche advisory settings (Phase 2).""" - - enabled: bool = False - tick_seconds: int = 1800 - center_ids: list = field(default_factory=lambda: ["SNFAC"]) - season_months: list = field(default_factory=lambda: [12, 1, 2, 3, 4]) - - -@dataclass -class USGSConfig: - """USGS stream gauge settings.""" - - enabled: bool = False - tick_seconds: int = 900 # Minimum 15 min per USGS guidelines - sites: list = field(default_factory=list) # Site IDs, e.g. ["13090500"] - flood_thresholds: dict = field(default_factory=dict) # {site_id: {flow: X, height: Y}} - - -@dataclass -class TomTomConfig: - """TomTom traffic flow settings.""" - - enabled: bool = False - tick_seconds: int = 300 - api_key: str = "" # Supports ${ENV_VAR} - corridors: list = field(default_factory=list) # [{name, lat, lon}, ...] - - -@dataclass -class Roads511Config: - """511 road conditions settings.""" - - enabled: bool = False - tick_seconds: int = 300 - api_key: str = "" # Supports ${ENV_VAR} - base_url: str = "" # State-specific, e.g. "https://511.idaho.gov/api/v2" - endpoints: list = field(default_factory=lambda: ["/get/event"]) - bbox: list = field(default_factory=list) # [west, south, east, north] - - -@dataclass -class FIRMSConfig: - """NASA FIRMS satellite fire hotspot settings.""" - - enabled: bool = False - tick_seconds: int = 1800 # 30 min default - map_key: str = "" # NASA FIRMS MAP_KEY, get at https://firms.modaps.eosdis.nasa.gov/api/area/ - source: str = "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT - bbox: list = field(default_factory=list) # [west, south, east, north] - day_range: int = 1 # 1-10 days of data - confidence_min: str = "nominal" # low, nominal, high - proximity_km: float = 10.0 # km to match known fire - - -@dataclass -class EnvironmentalConfig: - """Environmental feeds settings.""" - - enabled: bool = False - nws_zones: list = field(default_factory=lambda: ["IDZ016", "IDZ030"]) - nws: NWSConfig = field(default_factory=NWSConfig) - swpc: SWPCConfig = field(default_factory=SWPCConfig) - ducting: DuctingConfig = field(default_factory=DuctingConfig) - fires: NICFFiresConfig = field(default_factory=NICFFiresConfig) - avalanche: AvalancheConfig = field(default_factory=AvalancheConfig) - usgs: USGSConfig = field(default_factory=USGSConfig) - traffic: TomTomConfig = field(default_factory=TomTomConfig) - roads511: Roads511Config = field(default_factory=Roads511Config) - firms: FIRMSConfig = field(default_factory=FIRMSConfig) - - -@dataclass -class NotificationRuleConfig: - """Self-contained notification rule with inline delivery config.""" - - name: str = "" - enabled: bool = True - - # Trigger type - trigger_type: str = "condition" # "condition" or "schedule" - - # Condition trigger fields - categories: list = field(default_factory=list) # Empty = all categories - min_severity: str = "routine" - - # Schedule trigger fields - schedule_frequency: str = "daily" # daily, twice_daily, weekly, custom - schedule_time: str = "07:00" - schedule_time_2: str = "19:00" # For twice_daily - schedule_days: list = field(default_factory=list) # For weekly - schedule_cron: str = "" # For custom - message_type: str = "mesh_health_summary" - custom_message: str = "" - - # Delivery type - delivery_type: str = "" # mesh_broadcast, mesh_dm, email, webhook - - # Mesh broadcast fields - broadcast_channel: int = 0 - - # Mesh DM fields - node_ids: list = field(default_factory=list) - - # Email fields - smtp_host: str = "" - smtp_port: int = 587 - smtp_user: str = "" - smtp_password: str = "" - smtp_tls: bool = True - from_address: str = "" - recipients: list = field(default_factory=list) - - # Webhook fields - webhook_url: str = "" - webhook_headers: dict = field(default_factory=dict) - - # Behavior - cooldown_minutes: int = 10 - override_quiet: bool = False - - # Legacy field for migration (ignored in new format) - channel_ids: list = field(default_factory=list) - - -@dataclass -class NotificationsConfig: - """Notification system settings.""" - - enabled: bool = False - quiet_hours_enabled: bool = True # Master toggle for quiet hours - quiet_hours_start: str = "22:00" - quiet_hours_end: str = "06:00" - rules: list = field(default_factory=list) # List of NotificationRuleConfig - -@dataclass -class DashboardConfig: - """Web dashboard settings.""" - - enabled: bool = True - port: int = 8080 - host: str = "0.0.0.0" - -@dataclass -class Config: - """Main configuration container.""" - - # Global settings - timezone: str = "America/Boise" # IANA timezone for local time display - - bot: BotConfig = field(default_factory=BotConfig) - connection: ConnectionConfig = field(default_factory=ConnectionConfig) - response: ResponseConfig = field(default_factory=ResponseConfig) - history: HistoryConfig = field(default_factory=HistoryConfig) - memory: MemoryConfig = field(default_factory=MemoryConfig) - context: ContextConfig = field(default_factory=ContextConfig) - commands: CommandsConfig = field(default_factory=CommandsConfig) - llm: LLMConfig = field(default_factory=LLMConfig) - weather: WeatherConfig = field(default_factory=WeatherConfig) - meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig) - knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig) - mesh_sources: list[MeshSourceConfig] = field(default_factory=list) - mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig) - environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig) - dashboard: DashboardConfig = field(default_factory=DashboardConfig) - notifications: NotificationsConfig = field(default_factory=NotificationsConfig) - - _config_path: Optional[Path] = field(default=None, repr=False) - - def resolve_api_key(self) -> str: - """Resolve API key from config or environment.""" - if self.llm.api_key: - # Check if it's an env var reference like ${LLM_API_KEY} - if self.llm.api_key.startswith("${") and self.llm.api_key.endswith("}"): - env_var = self.llm.api_key[2:-1] - return os.environ.get(env_var, "") - return self.llm.api_key - # Fall back to common env vars - for env_var in ["LLM_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"]: - if value := os.environ.get(env_var): - return value - return "" - - -def _migrate_legacy_channels(notifications, data: dict): - """Migrate legacy channels+rules format to self-contained rules.""" - old_channels = data.get("channels", []) - old_rules = data.get("rules", []) - - if not old_channels: - return - - _config_logger.info("Migrating %d legacy notification channels to inline rules", len(old_channels)) - - # Build channel lookup - channel_map = {} - for ch in old_channels: - if isinstance(ch, dict): - channel_map[ch.get("id", "")] = ch - - # Convert each old rule + referenced channels to new format - migrated_rules = [] - for old_rule in old_rules: - if not isinstance(old_rule, dict): - continue - - channel_ids = old_rule.get("channel_ids", []) - if not channel_ids: - continue - - for ch_id in channel_ids: - ch = channel_map.get(ch_id) - if not ch: - continue - - # Create new rule with inline delivery config - new_rule = NotificationRuleConfig( - name=old_rule.get("name", "") or ch_id, - enabled=ch.get("enabled", True), - trigger_type="condition", - categories=old_rule.get("categories", []), - min_severity=old_rule.get("min_severity", "priority"), - delivery_type=ch.get("type", "mesh_broadcast"), - broadcast_channel=ch.get("channel_index", 0), - node_ids=ch.get("node_ids", []), - smtp_host=ch.get("smtp_host", ""), - smtp_port=ch.get("smtp_port", 587), - smtp_user=ch.get("smtp_user", ""), - smtp_password=ch.get("smtp_password", ""), - smtp_tls=ch.get("smtp_tls", True), - from_address=ch.get("from_address", ""), - recipients=ch.get("recipients", []), - webhook_url=ch.get("url", ""), - webhook_headers=ch.get("headers", {}), - cooldown_minutes=10, - override_quiet=old_rule.get("override_quiet", False), - ) - migrated_rules.append(new_rule) - - # Replace rules with migrated ones (migrated rules come first, then any new-format rules) - if migrated_rules: - # Keep only non-migrated rules (those without channel_ids) - existing_new_rules = [r for r in notifications.rules if not getattr(r, 'channel_ids', [])] - notifications.rules = migrated_rules + existing_new_rules - _config_logger.info("Migrated to %d self-contained rules", len(notifications.rules)) - - -def _dict_to_dataclass(cls, data: dict): - """Recursively convert dict to dataclass, handling nested structures.""" - if data is None: - return cls() - - field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()} - kwargs = {} - - for key, value in data.items(): - if key.startswith("_"): - continue - if key not in field_types: - continue - - field_type = field_types[key] - - # Handle nested dataclasses - if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(field_type, value) - # Handle list of MeshSourceConfig - elif key == "mesh_sources" and isinstance(value, list): - kwargs[key] = [ - _dict_to_dataclass(MeshSourceConfig, item) - if isinstance(item, dict) else item - for item in value - ] - # Handle list of RegionAnchor - elif key == "regions" and isinstance(value, list): - kwargs[key] = [ - _dict_to_dataclass(RegionAnchor, item) - if isinstance(item, dict) else item - for item in value - ] - # Handle AlertRulesConfig - elif key == "alert_rules" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(AlertRulesConfig, value) - # Handle nested environmental configs - elif key == "nws" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(NWSConfig, value) - elif key == "swpc" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(SWPCConfig, value) - elif key == "ducting" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(DuctingConfig, value) - elif key == "fires" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(NICFFiresConfig, value) - elif key == "avalanche" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(AvalancheConfig, value) - elif key == "usgs" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(USGSConfig, value) - elif key == "traffic" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(TomTomConfig, value) - elif key == "roads511" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(Roads511Config, value) - elif key == "firms" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(FIRMSConfig, value) - elif key == "dashboard" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(DashboardConfig, value) - elif key == "notifications" and isinstance(value, dict): - notifications = _dict_to_dataclass(NotificationsConfig, value) - if "rules" in value and isinstance(value["rules"], list): - notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]] - # Migrate old channels+rules format if present - if "channels" in value and isinstance(value["channels"], list) and value["channels"]: - _migrate_legacy_channels(notifications, value) - kwargs[key] = notifications - else: - kwargs[key] = value - - return cls(**kwargs) - - -def _dataclass_to_dict(obj) -> dict: - """Recursively convert dataclass to dict for YAML serialization.""" - if not hasattr(obj, "__dataclass_fields__"): - return obj - - result = {} - for field_name in obj.__dataclass_fields__: - if field_name.startswith("_"): - continue - value = getattr(obj, field_name) - if hasattr(value, "__dataclass_fields__"): - result[field_name] = _dataclass_to_dict(value) - elif isinstance(value, list): - # Handle list of dataclasses (like mesh_sources) - result[field_name] = [ - _dataclass_to_dict(item) if hasattr(item, "__dataclass_fields__") else item - for item in value - ] - else: - result[field_name] = value - return result - - -def load_config(config_path: Optional[Path] = None) -> Config: - """Load configuration from YAML file. - - Args: - config_path: Path to config file. Defaults to ./config.yaml - - Returns: - Config object with loaded settings - """ - if config_path is None: - config_path = Path("config.yaml") - - config_path = Path(config_path) - - if not config_path.exists(): - # Return default config if file doesn't exist - config = Config() - config._config_path = config_path - return config - - with open(config_path, "r") as f: - data = yaml.safe_load(f) or {} - - config = _dict_to_dataclass(Config, data) - config._config_path = config_path - return config - - -def save_config(config: Config, config_path: Optional[Path] = None) -> None: - """Save configuration to YAML file. - - Args: - config: Config object to save - config_path: Path to save to. Uses config._config_path if not specified - """ - if config_path is None: - config_path = config._config_path or Path("config.yaml") - - config_path = Path(config_path) - - data = _dataclass_to_dict(config) - - # Add header comment - header = "# MeshAI Configuration\n# Generated by meshai --config\n\n" - - with open(config_path, "w") as f: - f.write(header) - yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) +"""Configuration management for MeshAI.""" + +import logging +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +import yaml + +_config_logger = logging.getLogger(__name__) + + +@dataclass +class BotConfig: + """Bot identity and trigger settings.""" + + name: str = "ai" + owner: str = "" + respond_to_dms: bool = True + filter_bbs_protocols: bool = True + + +@dataclass +class ConnectionConfig: + """Meshtastic connection settings.""" + + type: str = "serial" # serial or tcp + serial_port: str = "/dev/ttyUSB0" + tcp_host: str = "192.168.1.100" + tcp_port: int = 4403 + + +@dataclass +class ResponseConfig: + """Response behavior settings.""" + + delay_min: float = 1.5 + delay_max: float = 2.5 + max_length: int = 200 + max_messages: int = 3 + + +@dataclass +class HistoryConfig: + """Conversation history settings.""" + + database: str = "conversations.db" + max_messages_per_user: int = 50 + conversation_timeout: int = 86400 # 24 hours + + # Cleanup settings + auto_cleanup: bool = True + cleanup_interval_hours: int = 24 + max_age_days: int = 30 # Delete conversations older than this + + +@dataclass +class MemoryConfig: + """Rolling summary memory settings.""" + + enabled: bool = True # Enable memory optimization + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + window_size: int = 4 # Recent message pairs to keep in full + summarize_threshold: int = 8 # Messages before re-summarizing + + +@dataclass +class ContextConfig: + """Passive mesh context settings.""" + + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + observe_channels: list[int] = field(default_factory=list) # Empty = all channels + ignore_nodes: list[str] = field(default_factory=list) # Node IDs to ignore + max_age: int = 2_592_000 # 30 days in seconds + max_context_items: int = 20 # Max observations injected into LLM context + + +@dataclass +class CommandsConfig: + """Command settings.""" + + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + prefix: str = "!" + disabled_commands: list[str] = field(default_factory=list) + custom_commands: dict = field(default_factory=dict) + + +@dataclass +class LLMConfig: + """LLM backend settings.""" + + backend: str = "openai" # openai, anthropic, google + api_key: str = "" + base_url: str = "https://api.openai.com/v1" + model: str = "gpt-4o-mini" + timeout: int = 30 + max_response_tokens: int = 8192 # Let LLM generate full responses; chunker handles size + + system_prompt: str = ( + "RESPONSE RULES:\n" + "- For casual conversation, keep responses brief (1-2 sentences).\n" + "- For mesh health questions, give detailed data-driven responses.\n" + "- Be concise but friendly. No markdown formatting.\n" + "- If asked about mesh activity and no recent traffic is shown, say you haven't " + "observed any yet.\n" + "- When asked about yourself or commands, answer conversationally based on " + "the command list provided below. Don't dump lists unless asked.\n" + "- You are part of the freq51 mesh.\n" + "- When asked about yourself or commands, answer conversationally. Don't dump lists.\n" + "- You are part of the freq51 mesh in the Twin Falls, Idaho area.\n" + "- NEVER use markdown formatting (no bold, no asterisks, no bullet points, no numbered lists). Plain text only.\n" + "- NEVER say 'Want me to keep going?' -- the system handles continuation prompts automatically." + ) + use_system_prompt: bool = True # Toggle to disable sending system prompt + web_search: bool = False # Enable web search (Open WebUI feature) + google_grounding: bool = False # Enable Google Search grounding (Gemini only) + + +@dataclass +class OpenMeteoConfig: + """Open-Meteo weather provider settings.""" + + url: str = "https://api.open-meteo.com/v1" + + +@dataclass +class WttrConfig: + """wttr.in weather provider settings.""" + + url: str = "https://wttr.in" + + +@dataclass +class WeatherConfig: + """Weather command settings.""" + + primary: str = "openmeteo" # openmeteo, wttr, llm + fallback: str = "llm" # openmeteo, wttr, llm, none + default_location: str = "" + openmeteo: OpenMeteoConfig = field(default_factory=OpenMeteoConfig) + wttr: WttrConfig = field(default_factory=WttrConfig) + + +@dataclass +class MeshMonitorConfig: + """MeshMonitor trigger sync settings.""" + + enabled: bool = False + url: str = "" # e.g., http://100.64.0.11:3333 + inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands + refresh_interval: int = 30 # Tick interval in seconds (default 30) + polite_mode: bool = False # Reduces polling frequency for shared instances # Seconds between refreshes + + +@dataclass +class KnowledgeConfig: + """Knowledge base settings.""" + + enabled: bool = False + backend: str = "auto" # "qdrant", "sqlite", or "auto" (try qdrant, fall back to sqlite) + + # Qdrant / RECON settings + qdrant_host: str = "" # e.g., "192.168.1.150" + qdrant_port: int = 6333 + qdrant_collection: str = "recon_knowledge_hybrid" + tei_host: str = "" # TEI embedding service host + tei_port: int = 8090 + sparse_host: str = "" # Sparse embedding service host + sparse_port: int = 8091 + use_sparse: bool = True # Enable hybrid dense+sparse search + + # SQLite fallback settings + db_path: str = "" + top_k: int = 5 + + +@dataclass +class MeshSourceConfig: + """Configuration for a mesh data source.""" + + name: str = "" + type: str = "" # "meshview", "meshmonitor", or "mqtt" + url: str = "" + api_token: str = "" # MeshMonitor only, supports ${ENV_VAR} + refresh_interval: int = 30 # Tick interval in seconds (default 30) + polite_mode: bool = False # Reduces polling frequency for shared instances + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + + +@dataclass +class RegionAnchor: + """A fixed region anchor point with geographic context.""" + + name: str = "" + lat: float = 0.0 + lon: float = 0.0 + local_name: str = "" # e.g., "Magic Valley" + description: str = "" # e.g., "Twin Falls, Burley, Jerome along I-84/US-93" + aliases: list[str] = field(default_factory=list) # e.g., ["southern Idaho", "magic valley"] + cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"] + nws_zones: list[str] = field(default_factory=list) # NWS zone codes (e.g., ["IDZ016", "IDZ030"]) + + +@dataclass +class AlertRulesConfig: + """Per-condition alert toggles and thresholds.""" + + # Infrastructure + infra_offline: bool = True + infra_recovery: bool = True + new_router: bool = True + + # Power + battery_trend_declining: bool = True + battery_warning: bool = True + battery_critical: bool = True + battery_emergency: bool = True + battery_warning_threshold: int = 30 + battery_critical_threshold: int = 15 + battery_emergency_threshold: int = 5 + # Voltage-based thresholds (more accurate than percentage) + battery_warning_voltage: float = 3.60 + battery_critical_voltage: float = 3.50 + battery_emergency_voltage: float = 3.40 + power_source_change: bool = True + solar_not_charging: bool = True + + # Utilization + sustained_high_util: bool = True + high_util_threshold: float = 40.0 + high_util_hours: int = 6 + packet_flood: bool = True + packet_flood_threshold: int = 10 + + # Coverage + infra_single_gateway: bool = True + feeder_offline: bool = True + region_total_blackout: bool = True + + # Health Scores + mesh_score_alert: bool = True + mesh_score_threshold: int = 65 + region_score_alert: bool = True + region_score_threshold: int = 60 + + +@dataclass +class MeshIntelligenceConfig: + """Mesh intelligence and health scoring settings.""" + + enabled: bool = False + regions: list[RegionAnchor] = field(default_factory=list) # Fixed region anchors + locality_radius_miles: float = 8.0 # Radius for locality clustering within regions + offline_threshold_hours: int = 2 # Hours before node considered offline + packet_threshold: int = 500 # Non-text packets per 24h to flag + # TODO: behavior pillar uses wrong scale - see meshai-v03-notification-handoff.md bug #2 + battery_warning_percent: int = 30 # Battery level for warnings + + # Alert settings + critical_nodes: list[str] = field(default_factory=list) # Short names of critical nodes (e.g., ["MHR", "HPR"]) + alert_channel: int = -1 # Channel to broadcast alerts on. -1 = disabled, 0+ = channel index + alert_cooldown_minutes: int = 30 # Min minutes between repeated alerts for same condition + alert_rules: AlertRulesConfig = field(default_factory=AlertRulesConfig) + + +# Environmental feed configs +@dataclass +class NWSConfig: + """NWS weather alerts settings.""" + + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + tick_seconds: int = 60 + areas: list = field(default_factory=lambda: ["ID"]) + severity_min: str = "moderate" + user_agent: str = "" + + +@dataclass +class SWPCConfig: + """NOAA Space Weather settings.""" + + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + + +@dataclass +class DuctingConfig: + """Tropospheric ducting settings.""" + + enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection + tick_seconds: int = 10800 # 3 hours + latitude: float = 42.56 # Twin Falls area default + longitude: float = -114.47 + + +@dataclass +class NICFFiresConfig: + """NIFC fire perimeters settings (Phase 2).""" + + enabled: bool = False + tick_seconds: int = 600 + state: str = "US-ID" + + +@dataclass +class AvalancheConfig: + """Avalanche advisory settings (Phase 2).""" + + enabled: bool = False + tick_seconds: int = 1800 + center_ids: list = field(default_factory=lambda: ["SNFAC"]) + season_months: list = field(default_factory=lambda: [12, 1, 2, 3, 4]) + + +@dataclass +class USGSConfig: + """USGS stream gauge settings.""" + + enabled: bool = False + tick_seconds: int = 900 # Minimum 15 min per USGS guidelines + sites: list = field(default_factory=list) # Site IDs, e.g. ["13090500"] + flood_thresholds: dict = field(default_factory=dict) # {site_id: {flow: X, height: Y}} + + +@dataclass +class TomTomConfig: + """TomTom traffic flow settings.""" + + enabled: bool = False + tick_seconds: int = 300 + api_key: str = "" # Supports ${ENV_VAR} + corridors: list = field(default_factory=list) # [{name, lat, lon}, ...] + + +@dataclass +class Roads511Config: + """511 road conditions settings.""" + + enabled: bool = False + tick_seconds: int = 300 + api_key: str = "" # Supports ${ENV_VAR} + base_url: str = "" # State-specific, e.g. "https://511.idaho.gov/api/v2" + endpoints: list = field(default_factory=lambda: ["/get/event"]) + bbox: list = field(default_factory=list) # [west, south, east, north] + + +@dataclass +class FIRMSConfig: + """NASA FIRMS satellite fire hotspot settings.""" + + enabled: bool = False + tick_seconds: int = 1800 # 30 min default + map_key: str = "" # NASA FIRMS MAP_KEY, get at https://firms.modaps.eosdis.nasa.gov/api/area/ + source: str = "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT + bbox: list = field(default_factory=list) # [west, south, east, north] + day_range: int = 1 # 1-10 days of data + confidence_min: str = "nominal" # low, nominal, high + proximity_km: float = 10.0 # km to match known fire + + +@dataclass +class EnvironmentalConfig: + """Environmental feeds settings.""" + + enabled: bool = False + nws_zones: list = field(default_factory=lambda: ["IDZ016", "IDZ030"]) + nws: NWSConfig = field(default_factory=NWSConfig) + swpc: SWPCConfig = field(default_factory=SWPCConfig) + ducting: DuctingConfig = field(default_factory=DuctingConfig) + fires: NICFFiresConfig = field(default_factory=NICFFiresConfig) + avalanche: AvalancheConfig = field(default_factory=AvalancheConfig) + usgs: USGSConfig = field(default_factory=USGSConfig) + traffic: TomTomConfig = field(default_factory=TomTomConfig) + roads511: Roads511Config = field(default_factory=Roads511Config) + firms: FIRMSConfig = field(default_factory=FIRMSConfig) + + +@dataclass +class NotificationRuleConfig: + """Self-contained notification rule with inline delivery config.""" + + name: str = "" + enabled: bool = True + + # Trigger type + trigger_type: str = "condition" # "condition" or "schedule" + + # Condition trigger fields + categories: list = field(default_factory=list) # Empty = all categories + min_severity: str = "routine" + + # Schedule trigger fields + schedule_frequency: str = "daily" # daily, twice_daily, weekly, custom + schedule_time: str = "07:00" + schedule_time_2: str = "19:00" # For twice_daily + schedule_days: list = field(default_factory=list) # For weekly + schedule_cron: str = "" # For custom + schedule_match: Optional[str] = None # "digest" for digest deliveries + message_type: str = "mesh_health_summary" + custom_message: str = "" + + # Delivery type + delivery_type: str = "" # mesh_broadcast, mesh_dm, email, webhook + + # Mesh broadcast fields + broadcast_channel: int = 0 + + # Mesh DM fields + node_ids: list = field(default_factory=list) + + # Email fields + smtp_host: str = "" + smtp_port: int = 587 + smtp_user: str = "" + smtp_password: str = "" + smtp_tls: bool = True + from_address: str = "" + recipients: list = field(default_factory=list) + + # Webhook fields + webhook_url: str = "" + webhook_headers: dict = field(default_factory=dict) + + # Behavior + cooldown_minutes: int = 10 + override_quiet: bool = False + + # Legacy field for migration (ignored in new format) + channel_ids: list = field(default_factory=list) + + +@dataclass +class DigestConfig: + """Digest scheduler settings.""" + + schedule: str = "07:00" # HH:MM time to fire digest + include: list[str] = field(default_factory=list) # Toggle names to include (empty = default set) + + +@dataclass +class NotificationsConfig: + """Notification system settings.""" + + enabled: bool = False + quiet_hours_enabled: bool = True # Master toggle for quiet hours + quiet_hours_start: str = "22:00" + quiet_hours_end: str = "06:00" + digest: DigestConfig = field(default_factory=DigestConfig) + rules: list = field(default_factory=list) # List of NotificationRuleConfig + +@dataclass +class DashboardConfig: + """Web dashboard settings.""" + + enabled: bool = True + port: int = 8080 + host: str = "0.0.0.0" + +@dataclass +class Config: + """Main configuration container.""" + + # Global settings + timezone: str = "America/Boise" # IANA timezone for local time display + + bot: BotConfig = field(default_factory=BotConfig) + connection: ConnectionConfig = field(default_factory=ConnectionConfig) + response: ResponseConfig = field(default_factory=ResponseConfig) + history: HistoryConfig = field(default_factory=HistoryConfig) + memory: MemoryConfig = field(default_factory=MemoryConfig) + context: ContextConfig = field(default_factory=ContextConfig) + commands: CommandsConfig = field(default_factory=CommandsConfig) + llm: LLMConfig = field(default_factory=LLMConfig) + weather: WeatherConfig = field(default_factory=WeatherConfig) + meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig) + knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig) + mesh_sources: list[MeshSourceConfig] = field(default_factory=list) + mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig) + environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig) + dashboard: DashboardConfig = field(default_factory=DashboardConfig) + notifications: NotificationsConfig = field(default_factory=NotificationsConfig) + + _config_path: Optional[Path] = field(default=None, repr=False) + + def resolve_api_key(self) -> str: + """Resolve API key from config or environment.""" + if self.llm.api_key: + # Check if it's an env var reference like ${LLM_API_KEY} + if self.llm.api_key.startswith("${") and self.llm.api_key.endswith("}"): + env_var = self.llm.api_key[2:-1] + return os.environ.get(env_var, "") + return self.llm.api_key + # Fall back to common env vars + for env_var in ["LLM_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"]: + if value := os.environ.get(env_var): + return value + return "" + + +def _migrate_legacy_channels(notifications, data: dict): + """Migrate legacy channels+rules format to self-contained rules.""" + old_channels = data.get("channels", []) + old_rules = data.get("rules", []) + + if not old_channels: + return + + _config_logger.info("Migrating %d legacy notification channels to inline rules", len(old_channels)) + + # Build channel lookup + channel_map = {} + for ch in old_channels: + if isinstance(ch, dict): + channel_map[ch.get("id", "")] = ch + + # Convert each old rule + referenced channels to new format + migrated_rules = [] + for old_rule in old_rules: + if not isinstance(old_rule, dict): + continue + + channel_ids = old_rule.get("channel_ids", []) + if not channel_ids: + continue + + for ch_id in channel_ids: + ch = channel_map.get(ch_id) + if not ch: + continue + + # Create new rule with inline delivery config + new_rule = NotificationRuleConfig( + name=old_rule.get("name", "") or ch_id, + enabled=ch.get("enabled", True), + trigger_type="condition", + categories=old_rule.get("categories", []), + min_severity=old_rule.get("min_severity", "priority"), + delivery_type=ch.get("type", "mesh_broadcast"), + broadcast_channel=ch.get("channel_index", 0), + node_ids=ch.get("node_ids", []), + smtp_host=ch.get("smtp_host", ""), + smtp_port=ch.get("smtp_port", 587), + smtp_user=ch.get("smtp_user", ""), + smtp_password=ch.get("smtp_password", ""), + smtp_tls=ch.get("smtp_tls", True), + from_address=ch.get("from_address", ""), + recipients=ch.get("recipients", []), + webhook_url=ch.get("url", ""), + webhook_headers=ch.get("headers", {}), + cooldown_minutes=10, + override_quiet=old_rule.get("override_quiet", False), + ) + migrated_rules.append(new_rule) + + # Replace rules with migrated ones (migrated rules come first, then any new-format rules) + if migrated_rules: + # Keep only non-migrated rules (those without channel_ids) + existing_new_rules = [r for r in notifications.rules if not getattr(r, 'channel_ids', [])] + notifications.rules = migrated_rules + existing_new_rules + _config_logger.info("Migrated to %d self-contained rules", len(notifications.rules)) + + +def _dict_to_dataclass(cls, data: dict): + """Recursively convert dict to dataclass, handling nested structures.""" + if data is None: + return cls() + + field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()} + kwargs = {} + + for key, value in data.items(): + if key.startswith("_"): + continue + if key not in field_types: + continue + + field_type = field_types[key] + + # Handle nested dataclasses + if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(field_type, value) + # Handle list of MeshSourceConfig + elif key == "mesh_sources" and isinstance(value, list): + kwargs[key] = [ + _dict_to_dataclass(MeshSourceConfig, item) + if isinstance(item, dict) else item + for item in value + ] + # Handle list of RegionAnchor + elif key == "regions" and isinstance(value, list): + kwargs[key] = [ + _dict_to_dataclass(RegionAnchor, item) + if isinstance(item, dict) else item + for item in value + ] + # Handle AlertRulesConfig + elif key == "alert_rules" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(AlertRulesConfig, value) + # Handle nested environmental configs + elif key == "nws" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(NWSConfig, value) + elif key == "swpc" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(SWPCConfig, value) + elif key == "ducting" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(DuctingConfig, value) + elif key == "fires" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(NICFFiresConfig, value) + elif key == "avalanche" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(AvalancheConfig, value) + elif key == "usgs" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(USGSConfig, value) + elif key == "traffic" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(TomTomConfig, value) + elif key == "roads511" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(Roads511Config, value) + elif key == "firms" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(FIRMSConfig, value) + elif key == "dashboard" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(DashboardConfig, value) + elif key == "digest" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(DigestConfig, value) + elif key == "notifications" and isinstance(value, dict): + notifications = _dict_to_dataclass(NotificationsConfig, value) + if "rules" in value and isinstance(value["rules"], list): + notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]] + # Migrate old channels+rules format if present + if "channels" in value and isinstance(value["channels"], list) and value["channels"]: + _migrate_legacy_channels(notifications, value) + kwargs[key] = notifications + else: + kwargs[key] = value + + return cls(**kwargs) + + +def _dataclass_to_dict(obj) -> dict: + """Recursively convert dataclass to dict for YAML serialization.""" + if not hasattr(obj, "__dataclass_fields__"): + return obj + + result = {} + for field_name in obj.__dataclass_fields__: + if field_name.startswith("_"): + continue + value = getattr(obj, field_name) + if hasattr(value, "__dataclass_fields__"): + result[field_name] = _dataclass_to_dict(value) + elif isinstance(value, list): + # Handle list of dataclasses (like mesh_sources) + result[field_name] = [ + _dataclass_to_dict(item) if hasattr(item, "__dataclass_fields__") else item + for item in value + ] + else: + result[field_name] = value + return result + + +def load_config(config_path: Optional[Path] = None) -> Config: + """Load configuration from YAML file. + + Args: + config_path: Path to config file. Defaults to ./config.yaml + + Returns: + Config object with loaded settings + """ + if config_path is None: + config_path = Path("config.yaml") + + config_path = Path(config_path) + + if not config_path.exists(): + # Return default config if file doesn't exist + config = Config() + config._config_path = config_path + return config + + with open(config_path, "r") as f: + data = yaml.safe_load(f) or {} + + config = _dict_to_dataclass(Config, data) + config._config_path = config_path + return config + + +def save_config(config: Config, config_path: Optional[Path] = None) -> None: + """Save configuration to YAML file. + + Args: + config: Config object to save + config_path: Path to save to. Uses config._config_path if not specified + """ + if config_path is None: + config_path = config._config_path or Path("config.yaml") + + config_path = Path(config_path) + + data = _dataclass_to_dict(config) + + # Add header comment + header = "# MeshAI Configuration\n# Generated by meshai --config\n\n" + + with open(config_path, "w") as f: + f.write(header) + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index e90f5b6..eb23f81 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -1,17 +1,23 @@ """Notification pipeline package. -Phase 2.1 + 2.2 + 2.3a: +Phase 2.1 + 2.2 + 2.3a + 2.3b: - EventBus: pub/sub ingress - Inhibitor: suppresses redundant events by inhibit_keys - Grouper: coalesces events sharing group_key within a window - SeverityRouter: forks immediate vs digest - Dispatcher: routes immediate via channels (existing rules schema) - DigestAccumulator: tracks priority/routine events for periodic digest + - DigestScheduler: fires digest at configured time (Phase 2.3b) Usage: - from meshai.notifications.pipeline import build_pipeline + from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline bus = build_pipeline(config) bus.emit(event) + + # Async lifecycle + scheduler = await start_pipeline(bus, config) + ... + await stop_pipeline(scheduler) """ from meshai.notifications.channels import create_channel @@ -24,13 +30,26 @@ from meshai.notifications.pipeline.dispatcher import Dispatcher from meshai.notifications.pipeline.inhibitor import Inhibitor from meshai.notifications.pipeline.grouper import Grouper from meshai.notifications.pipeline.digest import DigestAccumulator, Digest +from meshai.notifications.pipeline.scheduler import DigestScheduler def build_pipeline(config) -> EventBus: - """Build the pipeline and return the EventBus.""" + """Build the pipeline and return the EventBus. + + Components are stashed on bus._pipeline_components for lifecycle use. + """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) - digest = DigestAccumulator() + + # Build include_toggles from config + digest_cfg = getattr(config.notifications, "digest", None) + include_toggles = None + if digest_cfg is not None: + include_list = getattr(digest_cfg, "include", None) + if include_list: + include_toggles = list(include_list) + + digest = DigestAccumulator(include_toggles=include_toggles) severity_router = SeverityRouter( immediate_handler=dispatcher.dispatch, digest_handler=digest.enqueue, @@ -38,6 +57,16 @@ def build_pipeline(config) -> EventBus: grouper = Grouper(next_handler=severity_router.handle) inhibitor = Inhibitor(next_handler=grouper.handle) bus.subscribe(inhibitor.handle) + + # Stash components for lifecycle management + bus._pipeline_components = { + "inhibitor": inhibitor, + "grouper": grouper, + "severity_router": severity_router, + "dispatcher": dispatcher, + "digest": digest, + } + return bus @@ -48,7 +77,16 @@ def build_pipeline_components(config) -> tuple: """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) - digest = DigestAccumulator() + + # Build include_toggles from config + digest_cfg = getattr(config.notifications, "digest", None) + include_toggles = None + if digest_cfg is not None: + include_list = getattr(digest_cfg, "include", None) + if include_list: + include_toggles = list(include_list) + + digest = DigestAccumulator(include_toggles=include_toggles) severity_router = SeverityRouter( immediate_handler=dispatcher.dispatch, digest_handler=digest.enqueue, @@ -59,6 +97,45 @@ def build_pipeline_components(config) -> tuple: return bus, inhibitor, grouper, severity_router, dispatcher, digest +async def start_pipeline(bus: EventBus, config) -> DigestScheduler: + """Start the pipeline's async components (scheduler). + + Args: + bus: EventBus returned by build_pipeline() + config: Config object with notifications.digest settings + + Returns: + DigestScheduler instance (running). Call stop_pipeline() to stop. + """ + components = getattr(bus, "_pipeline_components", None) + if components is None: + raise RuntimeError("bus missing _pipeline_components; use build_pipeline()") + + digest = components["digest"] + + scheduler = DigestScheduler( + accumulator=digest, + config=config, + channel_factory=create_channel, + ) + await scheduler.start() + + # Stash scheduler for stop_pipeline + bus._pipeline_scheduler = scheduler + + return scheduler + + +async def stop_pipeline(scheduler: DigestScheduler) -> None: + """Stop the pipeline's async components. + + Args: + scheduler: DigestScheduler returned by start_pipeline() + """ + if scheduler is not None: + await scheduler.stop() + + __all__ = [ "EventBus", "SeverityRouter", @@ -68,7 +145,10 @@ __all__ = [ "Grouper", "DigestAccumulator", "Digest", + "DigestScheduler", "build_pipeline", "build_pipeline_components", + "start_pipeline", + "stop_pipeline", "get_bus", ] diff --git a/meshai/notifications/pipeline/scheduler.py b/meshai/notifications/pipeline/scheduler.py new file mode 100644 index 0000000..691c431 --- /dev/null +++ b/meshai/notifications/pipeline/scheduler.py @@ -0,0 +1,213 @@ +"""Digest scheduler — fires the digest at a configured time of day. + +Reads schedule and channel routing from config; calls +accumulator.render_digest() at the scheduled time; delivers the +result to all rules matching trigger_type=='schedule' and +schedule_match=='digest'. +""" + +import asyncio +import logging +import time +from datetime import datetime, timedelta +from typing import Callable, Optional + +from meshai.notifications.pipeline.digest import DigestAccumulator + + +class DigestScheduler: + """Fires digest at configured time and routes to matching channels.""" + + def __init__( + self, + accumulator: DigestAccumulator, + config, + channel_factory: Callable, + clock: Optional[Callable[[], float]] = None, + sleep: Optional[Callable[[float], "asyncio.Future"]] = None, + ): + self._accumulator = accumulator + self._config = config + self._channel_factory = channel_factory + self._clock = clock or time.time + self._sleep = sleep or asyncio.sleep + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + self._last_fire_at: float = 0.0 + self._logger = logging.getLogger("meshai.pipeline.scheduler") + + async def start(self) -> None: + """Begin the scheduler loop as an asyncio task.""" + if self._task is not None and not self._task.done(): + raise RuntimeError("Scheduler already running") + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._run(), name="digest-scheduler") + self._logger.info( + f"Digest scheduler started, schedule={self._schedule_str()!r}" + ) + + async def stop(self) -> None: + """Signal stop and wait for the task to finish.""" + if self._task is None: + return + if self._stop_event: + self._stop_event.set() + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): + # Cancellation is expected; other exceptions already logged + pass + self._task = None + self._logger.info("Digest scheduler stopped") + + async def _run(self) -> None: + """Main loop: sleep until next fire, fire, repeat.""" + try: + while self._stop_event and not self._stop_event.is_set(): + now = self._clock() + next_fire = self._next_fire_at(now) + delay = max(0.0, next_fire - now) + self._logger.info( + f"Next digest at {datetime.fromtimestamp(next_fire):%Y-%m-%d %H:%M}, " + f"sleeping {delay:.0f}s" + ) + # Interruptible sleep — wakes early if stop() is called + try: + await asyncio.wait_for( + self._stop_event.wait(), + timeout=delay, + ) + # If we got here without timeout, stop was requested + return + except asyncio.TimeoutError: + pass # Timeout fired = digest time arrived + + if self._stop_event.is_set(): + return + try: + await self._fire(self._clock()) + except Exception: + self._logger.exception("Digest fire failed; will retry next cycle") + except asyncio.CancelledError: + raise + except Exception: + self._logger.exception("Scheduler loop crashed unexpectedly") + raise + + async def _fire(self, now: float) -> None: + """Render and deliver one digest.""" + self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}") + digest = self._accumulator.render_digest(now) + self._last_fire_at = now + + rules = self._matching_rules() + if not rules: + self._logger.warning( + "No digest delivery rules configured (need rules with " + "trigger_type=='schedule' and schedule_match=='digest')" + ) + return + + for rule in rules: + try: + await self._deliver_to_rule(rule, digest, now) + except Exception: + self._logger.exception( + f"Digest delivery failed for rule {rule.name!r}" + ) + + async def _deliver_to_rule(self, rule, digest, now: float) -> None: + """Hand the rendered digest to a channel based on rule.delivery_type.""" + channel = self._channel_factory(rule) + delivery_type = rule.delivery_type + + if delivery_type in ("mesh_broadcast", "mesh_dm"): + # One deliver call per chunk + chunks = digest.mesh_chunks + total = len(chunks) + for i, chunk in enumerate(chunks, start=1): + payload = { + "category": "digest", + "severity": "routine", + "message": chunk, + "node_id": None, + "region": None, + "timestamp": now, + "chunk_index": i, + "chunk_total": total, + } + channel.deliver(payload) + self._logger.info( + f"Delivered {total} mesh chunk(s) to rule {rule.name!r}" + ) + else: + # Single full-form delivery + payload = { + "category": "digest", + "severity": "routine", + "message": digest.full, + "node_id": None, + "region": None, + "timestamp": now, + } + channel.deliver(payload) + self._logger.info( + f"Delivered digest to rule {rule.name!r} via {delivery_type}" + ) + + def _matching_rules(self) -> list: + """Find enabled schedule rules tagged as digest deliveries.""" + matches = [] + for rule in self._config.notifications.rules: + if not rule.enabled: + continue + if rule.trigger_type != "schedule": + continue + # schedule_match is the discriminator. Operators set it to + # "digest" to receive the morning digest. Other values + # reserved for future schedule types. + schedule_match = getattr(rule, "schedule_match", None) + if schedule_match != "digest": + continue + matches.append(rule) + return matches + + def _next_fire_at(self, now: float) -> float: + """Compute the next epoch timestamp when the digest should fire. + + Reads schedule HH:MM from config. If today's fire time has + already passed, returns tomorrow's. Uses local timezone. + """ + schedule_str = self._schedule_str() + h, m = self._parse_schedule(schedule_str) + now_dt = datetime.fromtimestamp(now) + target_today = now_dt.replace(hour=h, minute=m, second=0, microsecond=0) + if target_today.timestamp() <= now: + target = target_today + timedelta(days=1) + else: + target = target_today + return target.timestamp() + + def _schedule_str(self) -> str: + digest_cfg = getattr(self._config.notifications, "digest", None) + if digest_cfg is None: + return "07:00" + return getattr(digest_cfg, "schedule", "07:00") + + @staticmethod + def _parse_schedule(s: str) -> tuple[int, int]: + """Parse 'HH:MM' to (hour, minute). Falls back to 07:00 on bad input.""" + try: + hh, mm = s.strip().split(":", 1) + h = int(hh) + m = int(mm) + if not (0 <= h <= 23 and 0 <= m <= 59): + raise ValueError(f"out of range: {s}") + return h, m + except (ValueError, AttributeError): + # Fall back to 07:00 rather than crash the loop + return 7, 0 + + def last_fire_at(self) -> float: + return self._last_fire_at diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py new file mode 100644 index 0000000..e8ab568 --- /dev/null +++ b/tests/test_pipeline_scheduler.py @@ -0,0 +1,587 @@ +"""Tests for DigestScheduler (Phase 2.3b). + +Uses asyncio.run() since pytest-asyncio is not available in the container. +""" + +import asyncio +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Optional +from unittest.mock import MagicMock, call + +import pytest + +from meshai.notifications.events import make_event +from meshai.notifications.pipeline.digest import DigestAccumulator +from meshai.notifications.pipeline.scheduler import DigestScheduler + + +# ---- Test Fixtures ---- + +@dataclass +class MockRule: + """Mock notification rule for testing.""" + name: str = "test-rule" + enabled: bool = True + trigger_type: str = "schedule" + schedule_match: str = "digest" + delivery_type: str = "mesh_broadcast" + broadcast_channel: int = 0 + + +@dataclass +class MockDigestConfig: + """Mock digest config.""" + schedule: str = "07:00" + include: list = field(default_factory=list) + + +@dataclass +class MockNotificationsConfig: + """Mock notifications config.""" + enabled: bool = True + digest: MockDigestConfig = field(default_factory=MockDigestConfig) + rules: list = field(default_factory=list) + + +@dataclass +class MockConfig: + """Mock config for scheduler tests.""" + notifications: MockNotificationsConfig = field(default_factory=MockNotificationsConfig) + + +class MockChannel: + """Mock channel that records deliveries.""" + + def __init__(self): + self.deliveries = [] + + def deliver(self, payload: dict): + self.deliveries.append(payload) + + +def make_scheduler( + schedule: str = "07:00", + rules: Optional[list] = None, + clock: Optional[callable] = None, + sleep: Optional[callable] = None, + accumulator: Optional[DigestAccumulator] = None, +) -> tuple[DigestScheduler, MockConfig, dict]: + """Factory for creating test schedulers. + + Returns (scheduler, config, channels_by_rule_name). + """ + if rules is None: + rules = [MockRule()] + + config = MockConfig( + notifications=MockNotificationsConfig( + digest=MockDigestConfig(schedule=schedule), + rules=rules, + ) + ) + + channels = {} + + def channel_factory(rule): + ch = MockChannel() + channels[rule.name] = ch + return ch + + if accumulator is None: + accumulator = DigestAccumulator() + + scheduler = DigestScheduler( + accumulator=accumulator, + config=config, + channel_factory=channel_factory, + clock=clock, + sleep=sleep, + ) + + return scheduler, config, channels + + +# ---- Schedule Computation Tests ---- + +class TestScheduleComputation: + """Tests for _next_fire_at and _parse_schedule.""" + + def test_parse_schedule_valid(self): + """Valid HH:MM parses correctly.""" + scheduler, _, _ = make_scheduler() + assert scheduler._parse_schedule("07:00") == (7, 0) + assert scheduler._parse_schedule("23:59") == (23, 59) + assert scheduler._parse_schedule("00:00") == (0, 0) + assert scheduler._parse_schedule("12:30") == (12, 30) + + def test_parse_schedule_with_whitespace(self): + """Whitespace is stripped.""" + scheduler, _, _ = make_scheduler() + assert scheduler._parse_schedule(" 07:00 ") == (7, 0) + + def test_parse_schedule_invalid_falls_back(self): + """Invalid schedules fall back to 07:00.""" + scheduler, _, _ = make_scheduler() + # Bad format + assert scheduler._parse_schedule("7:00:00") == (7, 0) + assert scheduler._parse_schedule("invalid") == (7, 0) + assert scheduler._parse_schedule("") == (7, 0) + # Out of range + assert scheduler._parse_schedule("25:00") == (7, 0) + assert scheduler._parse_schedule("12:60") == (7, 0) + + def test_next_fire_at_future_today(self): + """If schedule time is later today, returns today's timestamp.""" + # Set clock to 06:00 on a known date + base_dt = datetime(2024, 6, 15, 6, 0, 0) + base_ts = base_dt.timestamp() + + scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) + next_fire = scheduler._next_fire_at(base_ts) + + # Should be 07:00 same day + expected_dt = datetime(2024, 6, 15, 7, 0, 0) + assert abs(next_fire - expected_dt.timestamp()) < 1 + + def test_next_fire_at_past_today_schedules_tomorrow(self): + """If schedule time has passed today, returns tomorrow's timestamp.""" + # Set clock to 08:00 on a known date + base_dt = datetime(2024, 6, 15, 8, 0, 0) + base_ts = base_dt.timestamp() + + scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) + next_fire = scheduler._next_fire_at(base_ts) + + # Should be 07:00 next day + expected_dt = datetime(2024, 6, 16, 7, 0, 0) + assert abs(next_fire - expected_dt.timestamp()) < 1 + + def test_next_fire_at_exact_time_schedules_tomorrow(self): + """If clock is exactly at schedule time, schedules tomorrow.""" + base_dt = datetime(2024, 6, 15, 7, 0, 0) + base_ts = base_dt.timestamp() + + scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) + next_fire = scheduler._next_fire_at(base_ts) + + # Should be 07:00 next day + expected_dt = datetime(2024, 6, 16, 7, 0, 0) + assert abs(next_fire - expected_dt.timestamp()) < 1 + + def test_schedule_str_reads_from_config(self): + """_schedule_str reads from config.notifications.digest.schedule.""" + scheduler, _, _ = make_scheduler(schedule="19:30") + assert scheduler._schedule_str() == "19:30" + + def test_schedule_str_defaults_to_0700(self): + """Missing digest config defaults to 07:00.""" + config = MockConfig() + config.notifications.digest = None + + scheduler = DigestScheduler( + accumulator=DigestAccumulator(), + config=config, + channel_factory=lambda r: MockChannel(), + ) + assert scheduler._schedule_str() == "07:00" + + +# ---- Fire Behavior Tests ---- + +class TestFireBehavior: + """Tests for _fire() digest delivery.""" + + def test_fire_delivers_to_matching_rule(self): + """_fire() delivers digest to rules with schedule_match='digest'.""" + accumulator = DigestAccumulator() + # Add an event so digest has content + accumulator.enqueue(make_event( + source="test", + category="weather_warning", + severity="priority", + title="Test Alert", + summary="Test alert summary", + )) + + scheduler, _, channels = make_scheduler( + rules=[MockRule(name="digest-mesh")], + accumulator=accumulator, + ) + + now = time.time() + + async def run_fire(): + await scheduler._fire(now) + + asyncio.run(run_fire()) + + assert "digest-mesh" in channels + ch = channels["digest-mesh"] + assert len(ch.deliveries) == 1 + payload = ch.deliveries[0] + assert payload["category"] == "digest" + assert payload["severity"] == "routine" + assert "Test alert" in payload["message"] or "Weather" in payload["message"] + + def test_fire_skips_disabled_rules(self): + """Disabled rules are not delivered to.""" + scheduler, _, channels = make_scheduler( + rules=[MockRule(name="disabled", enabled=False)], + ) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + # Channel should not be created for disabled rule + assert "disabled" not in channels + + def test_fire_skips_non_schedule_rules(self): + """Rules with trigger_type != 'schedule' are skipped.""" + rule = MockRule(name="condition-rule", trigger_type="condition") + scheduler, _, channels = make_scheduler(rules=[rule]) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + assert "condition-rule" not in channels + + def test_fire_skips_non_digest_schedule_rules(self): + """Schedule rules with schedule_match != 'digest' are skipped.""" + rule = MockRule(name="other-schedule", schedule_match="daily_report") + scheduler, _, channels = make_scheduler(rules=[rule]) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + assert "other-schedule" not in channels + + def test_fire_mesh_delivery_chunks(self): + """Mesh delivery types get per-chunk delivery.""" + accumulator = DigestAccumulator(mesh_char_limit=100) + # Add multiple events to force chunking + for i in range(5): + accumulator.enqueue(make_event( + source="test", + category="weather_warning", + severity="priority", + title=f"Alert {i}", + summary=f"Weather alert number {i} with enough text to use space", + )) + + scheduler, _, channels = make_scheduler( + rules=[MockRule(name="mesh", delivery_type="mesh_broadcast")], + accumulator=accumulator, + ) + + now = time.time() + + async def run_fire(): + await scheduler._fire(now) + + asyncio.run(run_fire()) + + ch = channels["mesh"] + # Should have multiple deliveries (one per chunk) + assert len(ch.deliveries) >= 1 + # Check chunk metadata + for payload in ch.deliveries: + assert "chunk_index" in payload + assert "chunk_total" in payload + + def test_fire_email_delivery_full_text(self): + """Email delivery type gets single full-text delivery.""" + accumulator = DigestAccumulator() + accumulator.enqueue(make_event( + source="test", + category="weather_warning", + severity="priority", + title="Test Alert", + summary="Test alert summary", + )) + + scheduler, _, channels = make_scheduler( + rules=[MockRule(name="email", delivery_type="email")], + accumulator=accumulator, + ) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + ch = channels["email"] + assert len(ch.deliveries) == 1 + payload = ch.deliveries[0] + assert "chunk_index" not in payload + assert "--- " in payload["message"] # Full format has header + + def test_fire_updates_last_fire_at(self): + """_fire() updates last_fire_at timestamp.""" + scheduler, _, _ = make_scheduler() + assert scheduler.last_fire_at() == 0.0 + + now = time.time() + + async def run_fire(): + await scheduler._fire(now) + + asyncio.run(run_fire()) + + assert scheduler.last_fire_at() == now + + def test_fire_empty_digest_still_delivers(self): + """Empty digest is still delivered (with 'no alerts' message).""" + scheduler, _, channels = make_scheduler( + rules=[MockRule(name="mesh")], + ) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + ch = channels["mesh"] + assert len(ch.deliveries) == 1 + assert "No alerts" in ch.deliveries[0]["message"] + + +# ---- Lifecycle Tests ---- + +class TestLifecycle: + """Tests for start/stop lifecycle.""" + + def test_start_creates_task(self): + """start() creates and runs an asyncio task.""" + scheduler, _, _ = make_scheduler() + + async def run_start(): + await scheduler.start() + assert scheduler._task is not None + assert not scheduler._task.done() + await scheduler.stop() + + asyncio.run(run_start()) + + def test_start_twice_raises(self): + """Starting twice raises RuntimeError.""" + scheduler, _, _ = make_scheduler() + + async def run_double_start(): + await scheduler.start() + try: + with pytest.raises(RuntimeError, match="already running"): + await scheduler.start() + finally: + await scheduler.stop() + + asyncio.run(run_double_start()) + + def test_stop_cancels_task(self): + """stop() cancels the running task.""" + scheduler, _, _ = make_scheduler() + + async def run_stop(): + await scheduler.start() + task = scheduler._task + await scheduler.stop() + assert scheduler._task is None + assert task.done() + + asyncio.run(run_stop()) + + def test_stop_idempotent(self): + """stop() on non-running scheduler is safe.""" + scheduler, _, _ = make_scheduler() + + async def run_stop(): + # Never started + await scheduler.stop() + # Should not raise + + asyncio.run(run_stop()) + + def test_stop_event_interrupts_sleep(self): + """stop() interrupts the sleep and exits cleanly.""" + sleep_calls = [] + + async def fake_sleep(duration): + sleep_calls.append(duration) + # Actually sleep briefly so we can cancel + await asyncio.sleep(0.01) + + # Set clock far from schedule time to get long sleep + base_dt = datetime(2024, 6, 15, 8, 0, 0) + scheduler, _, _ = make_scheduler( + schedule="07:00", + clock=lambda: base_dt.timestamp(), + sleep=fake_sleep, + ) + + async def run_test(): + await scheduler.start() + # Give task time to enter sleep + await asyncio.sleep(0.05) + await scheduler.stop() + + asyncio.run(run_test()) + + # Task should have exited cleanly + + +# ---- Integration Tests ---- + +class TestIntegration: + """Integration tests with real timing (short intervals).""" + + def test_scheduler_fires_on_schedule(self): + """Scheduler fires when schedule time arrives.""" + fire_times = [] + accumulator = DigestAccumulator() + + # Start at 06:59:59.95 (50ms before 07:00), delay will be ~50ms + clock_time = [datetime(2024, 6, 15, 6, 59, 59, 950000).timestamp()] + + def fake_clock(): + return clock_time[0] + + scheduler, _, channels = make_scheduler( + schedule="07:00", + clock=fake_clock, + accumulator=accumulator, + ) + + # Track when fire happens + original_fire = scheduler._fire + + async def tracking_fire(now): + fire_times.append(now) + await original_fire(now) + # After first fire, advance clock so next cycle has long delay + clock_time[0] = datetime(2024, 6, 15, 8, 0, 0).timestamp() + + scheduler._fire = tracking_fire + + async def run_test(): + await scheduler.start() + # Wait for the ~50ms delay plus some buffer + await asyncio.sleep(0.2) + await scheduler.stop() + + asyncio.run(run_test()) + + # Should have fired once + assert len(fire_times) >= 1 + + def test_scheduler_multiple_rules(self): + """Scheduler delivers to multiple matching rules.""" + accumulator = DigestAccumulator() + accumulator.enqueue(make_event( + source="test", + category="weather_warning", + severity="priority", + title="Test", + summary="Test summary", + )) + + rules = [ + MockRule(name="mesh1", delivery_type="mesh_broadcast"), + MockRule(name="mesh2", delivery_type="mesh_dm"), + MockRule(name="email", delivery_type="email"), + ] + + scheduler, _, channels = make_scheduler( + rules=rules, + accumulator=accumulator, + ) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + # All three should have received deliveries + assert "mesh1" in channels + assert "mesh2" in channels + assert "email" in channels + assert len(channels["mesh1"].deliveries) >= 1 + assert len(channels["mesh2"].deliveries) >= 1 + assert len(channels["email"].deliveries) == 1 + + def test_scheduler_handles_delivery_error(self): + """Scheduler continues after delivery error.""" + accumulator = DigestAccumulator() + accumulator.enqueue(make_event( + source="test", + category="weather_warning", + severity="priority", + title="Test", + summary="Test", + )) + + rules = [ + MockRule(name="bad"), + MockRule(name="good"), + ] + + call_order = [] + + def bad_channel_factory(rule): + call_order.append(rule.name) + if rule.name == "bad": + ch = MagicMock() + ch.deliver.side_effect = RuntimeError("delivery failed") + return ch + return MockChannel() + + scheduler = DigestScheduler( + accumulator=accumulator, + config=MockConfig( + notifications=MockNotificationsConfig(rules=rules) + ), + channel_factory=bad_channel_factory, + ) + + async def run_fire(): + await scheduler._fire(time.time()) + + asyncio.run(run_fire()) + + # Both rules should have been attempted + assert "bad" in call_order + assert "good" in call_order + + +# ---- Matching Rules Tests ---- + +class TestMatchingRules: + """Tests for _matching_rules() filter logic.""" + + def test_matching_rules_filters_correctly(self): + """Only enabled schedule rules with schedule_match='digest' match.""" + rules = [ + MockRule(name="good", enabled=True, trigger_type="schedule", schedule_match="digest"), + MockRule(name="disabled", enabled=False, trigger_type="schedule", schedule_match="digest"), + MockRule(name="condition", enabled=True, trigger_type="condition", schedule_match="digest"), + MockRule(name="other-match", enabled=True, trigger_type="schedule", schedule_match="daily"), + MockRule(name="no-match", enabled=True, trigger_type="schedule", schedule_match=None), + ] + + scheduler, _, _ = make_scheduler(rules=rules) + matches = scheduler._matching_rules() + + assert len(matches) == 1 + assert matches[0].name == "good" + + def test_matching_rules_empty_when_no_rules(self): + """Returns empty list when no rules configured.""" + scheduler, _, _ = make_scheduler(rules=[]) + matches = scheduler._matching_rules() + assert matches == []