mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 15:14:45 +02:00
feat(notifications): Phase 2.3b digest scheduler
Adds DigestScheduler class that fires digest at configured time (default 07:00) and routes to rules with trigger_type=schedule and schedule_match=digest. - DigestScheduler: asyncio task with start/stop lifecycle - Config: DigestConfig dataclass with schedule and include fields - Config: schedule_match field on NotificationRuleConfig - Pipeline: start_pipeline/stop_pipeline async lifecycle functions - Mesh channels get per-chunk delivery, email/webhook get full text - 26 new tests covering schedule computation, fire behavior, lifecycle Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
8326fc56b2
commit
493b43f7cf
5 changed files with 1998 additions and 1082 deletions
|
|
@ -1,328 +1,352 @@
|
||||||
# MeshAI Configuration
|
# MeshAI Configuration
|
||||||
# LLM-powered Meshtastic assistant
|
# LLM-powered Meshtastic assistant
|
||||||
#
|
#
|
||||||
# Copy this to config.yaml and customize as needed
|
# Copy this to config.yaml and customize as needed
|
||||||
# For Docker: mount as /data/config.yaml
|
# For Docker: mount as /data/config.yaml
|
||||||
|
|
||||||
# === BOT IDENTITY ===
|
# === BOT IDENTITY ===
|
||||||
bot:
|
bot:
|
||||||
name: ai # Bot's display name
|
name: ai # Bot's display name
|
||||||
owner: "" # Owner's callsign (optional)
|
owner: "" # Owner's callsign (optional)
|
||||||
respond_to_dms: true # Respond to direct messages
|
respond_to_dms: true # Respond to direct messages
|
||||||
filter_bbs_protocols: true # Ignore advBBS sync/notification messages
|
filter_bbs_protocols: true # Ignore advBBS sync/notification messages
|
||||||
|
|
||||||
# === MESHTASTIC CONNECTION ===
|
# === MESHTASTIC CONNECTION ===
|
||||||
connection:
|
connection:
|
||||||
type: tcp # serial | tcp
|
type: tcp # serial | tcp
|
||||||
serial_port: /dev/ttyUSB0 # For serial connection
|
serial_port: /dev/ttyUSB0 # For serial connection
|
||||||
tcp_host: localhost # For TCP connection (meshtasticd)
|
tcp_host: localhost # For TCP connection (meshtasticd)
|
||||||
tcp_port: 4403
|
tcp_port: 4403
|
||||||
|
|
||||||
# === RESPONSE BEHAVIOR ===
|
# === RESPONSE BEHAVIOR ===
|
||||||
response:
|
response:
|
||||||
delay_min: 2.2 # Min delay before responding (seconds)
|
delay_min: 2.2 # Min delay before responding (seconds)
|
||||||
delay_max: 3.0 # Max delay before responding
|
delay_max: 3.0 # Max delay before responding
|
||||||
max_length: 200 # Max chars per message chunk
|
max_length: 200 # Max chars per message chunk
|
||||||
max_messages: 3 # Max message chunks per response
|
max_messages: 3 # Max message chunks per response
|
||||||
|
|
||||||
# === CONVERSATION HISTORY ===
|
# === CONVERSATION HISTORY ===
|
||||||
history:
|
history:
|
||||||
database: /data/conversations.db
|
database: /data/conversations.db
|
||||||
max_messages_per_user: 50 # Messages to keep per user
|
max_messages_per_user: 50 # Messages to keep per user
|
||||||
conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h)
|
conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h)
|
||||||
auto_cleanup: true # Auto-delete old conversations
|
auto_cleanup: true # Auto-delete old conversations
|
||||||
cleanup_interval_hours: 24 # How often to run cleanup
|
cleanup_interval_hours: 24 # How often to run cleanup
|
||||||
max_age_days: 30 # Delete conversations older than this
|
max_age_days: 30 # Delete conversations older than this
|
||||||
|
|
||||||
# === MEMORY OPTIMIZATION ===
|
# === MEMORY OPTIMIZATION ===
|
||||||
memory:
|
memory:
|
||||||
enabled: true # Enable rolling summary memory
|
enabled: true # Enable rolling summary memory
|
||||||
window_size: 4 # Recent message pairs to keep in full
|
window_size: 4 # Recent message pairs to keep in full
|
||||||
summarize_threshold: 8 # Messages before re-summarizing
|
summarize_threshold: 8 # Messages before re-summarizing
|
||||||
|
|
||||||
# === MESH CONTEXT ===
|
# === MESH CONTEXT ===
|
||||||
context:
|
context:
|
||||||
enabled: true # Observe channel traffic for LLM context
|
enabled: true # Observe channel traffic for LLM context
|
||||||
observe_channels: [] # Channel indices to observe (empty = all)
|
observe_channels: [] # Channel indices to observe (empty = all)
|
||||||
ignore_nodes: [] # Node IDs to exclude from observation
|
ignore_nodes: [] # Node IDs to exclude from observation
|
||||||
max_age: 2592000 # Max age in seconds (default 30 days)
|
max_age: 2592000 # Max age in seconds (default 30 days)
|
||||||
max_context_items: 20 # Max observations injected into LLM context
|
max_context_items: 20 # Max observations injected into LLM context
|
||||||
|
|
||||||
# === LLM BACKEND ===
|
# === LLM BACKEND ===
|
||||||
llm:
|
llm:
|
||||||
backend: openai # openai | anthropic | google
|
backend: openai # openai | anthropic | google
|
||||||
api_key: "" # API key (or use LLM_API_KEY env var)
|
api_key: "" # API key (or use LLM_API_KEY env var)
|
||||||
base_url: https://api.openai.com/v1 # API base URL
|
base_url: https://api.openai.com/v1 # API base URL
|
||||||
model: gpt-4o-mini # Model name
|
model: gpt-4o-mini # Model name
|
||||||
timeout: 30 # Request timeout (seconds)
|
timeout: 30 # Request timeout (seconds)
|
||||||
system_prompt: >-
|
system_prompt: >-
|
||||||
You are a helpful assistant on a Meshtastic mesh network.
|
You are a helpful assistant on a Meshtastic mesh network.
|
||||||
Keep responses very brief - 1-2 short sentences, under 300 characters.
|
Keep responses very brief - 1-2 short sentences, under 300 characters.
|
||||||
Only give longer answers if the user explicitly asks for detail or explanation.
|
Only give longer answers if the user explicitly asks for detail or explanation.
|
||||||
Be concise but friendly. No markdown formatting.
|
Be concise but friendly. No markdown formatting.
|
||||||
google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries)
|
google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries)
|
||||||
|
|
||||||
# === WEATHER ===
|
# === WEATHER ===
|
||||||
weather:
|
weather:
|
||||||
primary: openmeteo # openmeteo | wttr | llm
|
primary: openmeteo # openmeteo | wttr | llm
|
||||||
fallback: llm # openmeteo | wttr | llm | none
|
fallback: llm # openmeteo | wttr | llm | none
|
||||||
default_location: "" # Default location for !weather (optional)
|
default_location: "" # Default location for !weather (optional)
|
||||||
|
|
||||||
# === MESHMONITOR INTEGRATION ===
|
# === MESHMONITOR INTEGRATION ===
|
||||||
meshmonitor:
|
meshmonitor:
|
||||||
enabled: false # Enable MeshMonitor trigger sync
|
enabled: false # Enable MeshMonitor trigger sync
|
||||||
url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333)
|
url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333)
|
||||||
inject_into_prompt: true # Include trigger list in LLM prompt
|
inject_into_prompt: true # Include trigger list in LLM prompt
|
||||||
refresh_interval: 300 # Seconds between trigger refreshes
|
refresh_interval: 300 # Seconds between trigger refreshes
|
||||||
|
|
||||||
# === KNOWLEDGE BASE (RAG) ===
|
# === KNOWLEDGE BASE (RAG) ===
|
||||||
knowledge:
|
knowledge:
|
||||||
enabled: false # Enable knowledge base search
|
enabled: false # Enable knowledge base search
|
||||||
db_path: "" # Path to knowledge SQLite database
|
db_path: "" # Path to knowledge SQLite database
|
||||||
top_k: 5 # Number of chunks to retrieve per query
|
top_k: 5 # Number of chunks to retrieve per query
|
||||||
|
|
||||||
# === MESH DATA SOURCES ===
|
# === MESH DATA SOURCES ===
|
||||||
# Connect to Meshview and/or MeshMonitor instances for live mesh
|
# Connect to Meshview and/or MeshMonitor instances for live mesh
|
||||||
# network analysis. Supports multiple sources. Configure via TUI
|
# network analysis. Supports multiple sources. Configure via TUI
|
||||||
# with meshai --config (Mesh Sources menu).
|
# with meshai --config (Mesh Sources menu).
|
||||||
#
|
#
|
||||||
# mesh_sources:
|
# mesh_sources:
|
||||||
# - name: "my-meshview"
|
# - name: "my-meshview"
|
||||||
# type: meshview
|
# type: meshview
|
||||||
# url: "https://meshview.example.com"
|
# url: "https://meshview.example.com"
|
||||||
# refresh_interval: 300
|
# refresh_interval: 300
|
||||||
# enabled: true
|
# enabled: true
|
||||||
#
|
#
|
||||||
# - name: "my-meshmonitor"
|
# - name: "my-meshmonitor"
|
||||||
# type: meshmonitor
|
# type: meshmonitor
|
||||||
# url: "http://192.168.1.100:3333"
|
# url: "http://192.168.1.100:3333"
|
||||||
# api_token: "${MM_API_TOKEN}"
|
# api_token: "${MM_API_TOKEN}"
|
||||||
# refresh_interval: 300
|
# refresh_interval: 300
|
||||||
# enabled: true
|
# enabled: true
|
||||||
#
|
#
|
||||||
# - name: "mqtt-broker"
|
# - name: "mqtt-broker"
|
||||||
# type: mqtt
|
# type: mqtt
|
||||||
# host: "mqtt.meshtastic.org"
|
# host: "mqtt.meshtastic.org"
|
||||||
# port: 1883
|
# port: 1883
|
||||||
# username: "meshdev"
|
# username: "meshdev"
|
||||||
# password: "large4cats"
|
# password: "large4cats"
|
||||||
# topic_root: "msh/US"
|
# topic_root: "msh/US"
|
||||||
# use_tls: false
|
# use_tls: false
|
||||||
# enabled: true
|
# enabled: true
|
||||||
mesh_sources: []
|
mesh_sources: []
|
||||||
|
|
||||||
# === MESH INTELLIGENCE ===
|
# === MESH INTELLIGENCE ===
|
||||||
# Geographic clustering and health scoring for mesh analysis.
|
# Geographic clustering and health scoring for mesh analysis.
|
||||||
# Requires mesh_sources to be configured with at least one data source.
|
# Requires mesh_sources to be configured with at least one data source.
|
||||||
#
|
#
|
||||||
# mesh_intelligence:
|
# mesh_intelligence:
|
||||||
# enabled: true
|
# enabled: true
|
||||||
# region_radius_miles: 40.0 # Radius for region clustering
|
# region_radius_miles: 40.0 # Radius for region clustering
|
||||||
# locality_radius_miles: 8.0 # Radius for locality clustering
|
# locality_radius_miles: 8.0 # Radius for locality clustering
|
||||||
# offline_threshold_hours: 2 # Hours before node considered offline
|
# offline_threshold_hours: 2 # Hours before node considered offline
|
||||||
# packet_threshold: 500 # Non-text packets per 24h to flag
|
# packet_threshold: 500 # Non-text packets per 24h to flag
|
||||||
# battery_warning_percent: 30 # Battery level for warnings
|
# battery_warning_percent: 30 # Battery level for warnings
|
||||||
# infra_overrides: [] # Node IDs to exclude from infrastructure
|
# infra_overrides: [] # Node IDs to exclude from infrastructure
|
||||||
# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"}
|
# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"}
|
||||||
mesh_intelligence:
|
mesh_intelligence:
|
||||||
enabled: false
|
enabled: false
|
||||||
region_radius_miles: 40.0
|
region_radius_miles: 40.0
|
||||||
locality_radius_miles: 8.0
|
locality_radius_miles: 8.0
|
||||||
offline_threshold_hours: 2
|
offline_threshold_hours: 2
|
||||||
packet_threshold: 500
|
packet_threshold: 500
|
||||||
battery_warning_percent: 30
|
battery_warning_percent: 30
|
||||||
infra_overrides: []
|
infra_overrides: []
|
||||||
region_labels: {}
|
region_labels: {}
|
||||||
|
|
||||||
# === ENVIRONMENTAL FEEDS ===
|
# === ENVIRONMENTAL FEEDS ===
|
||||||
# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo.
|
# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo.
|
||||||
# Provides weather alerts, HF propagation assessment, and tropospheric ducting.
|
# Provides weather alerts, HF propagation assessment, and tropospheric ducting.
|
||||||
#
|
#
|
||||||
environmental:
|
environmental:
|
||||||
enabled: false
|
enabled: false
|
||||||
nws_zones:
|
nws_zones:
|
||||||
- "IDZ016" # Western Magic Valley
|
- "IDZ016" # Western Magic Valley
|
||||||
- "IDZ030" # Southern Twin Falls County
|
- "IDZ030" # Southern Twin Falls County
|
||||||
|
|
||||||
# NWS Weather Alerts (api.weather.gov)
|
# NWS Weather Alerts (api.weather.gov)
|
||||||
nws:
|
nws:
|
||||||
enabled: true
|
enabled: true
|
||||||
tick_seconds: 60
|
tick_seconds: 60
|
||||||
areas: ["ID"]
|
areas: ["ID"]
|
||||||
severity_min: "moderate"
|
severity_min: "moderate"
|
||||||
user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS
|
user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS
|
||||||
|
|
||||||
# NOAA Space Weather (services.swpc.noaa.gov)
|
# NOAA Space Weather (services.swpc.noaa.gov)
|
||||||
swpc:
|
swpc:
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|
||||||
# Tropospheric ducting assessment (Open-Meteo GFS, no auth)
|
# Tropospheric ducting assessment (Open-Meteo GFS, no auth)
|
||||||
ducting:
|
ducting:
|
||||||
enabled: true
|
enabled: true
|
||||||
tick_seconds: 10800 # 3 hours
|
tick_seconds: 10800 # 3 hours
|
||||||
latitude: 42.56 # center of mesh coverage area
|
latitude: 42.56 # center of mesh coverage area
|
||||||
longitude: -114.47
|
longitude: -114.47
|
||||||
|
|
||||||
# NIFC Fire Perimeters (Phase 2)
|
# NIFC Fire Perimeters (Phase 2)
|
||||||
fires:
|
fires:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 600
|
tick_seconds: 600
|
||||||
state: "US-ID"
|
state: "US-ID"
|
||||||
|
|
||||||
# Avalanche Advisories (Phase 2)
|
# Avalanche Advisories (Phase 2)
|
||||||
avalanche:
|
avalanche:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 1800
|
tick_seconds: 1800
|
||||||
center_ids: ["SNFAC"]
|
center_ids: ["SNFAC"]
|
||||||
season_months: [12, 1, 2, 3, 4]
|
season_months: [12, 1, 2, 3, 4]
|
||||||
|
|
||||||
# USGS Stream Gauges (waterservices.usgs.gov)
|
# USGS Stream Gauges (waterservices.usgs.gov)
|
||||||
# Find site IDs at https://waterdata.usgs.gov/nwis
|
# Find site IDs at https://waterdata.usgs.gov/nwis
|
||||||
usgs:
|
usgs:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 900 # Min 15 min per USGS guidelines
|
tick_seconds: 900 # Min 15 min per USGS guidelines
|
||||||
sites: [] # e.g. ["13090500", "13088000"]
|
sites: [] # e.g. ["13090500", "13088000"]
|
||||||
|
|
||||||
# TomTom Traffic Flow (api.tomtom.com, requires API key)
|
# TomTom Traffic Flow (api.tomtom.com, requires API key)
|
||||||
traffic:
|
traffic:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 300
|
tick_seconds: 300
|
||||||
api_key: "" # Get key at developer.tomtom.com
|
api_key: "" # Get key at developer.tomtom.com
|
||||||
corridors: []
|
corridors: []
|
||||||
# Example corridors:
|
# Example corridors:
|
||||||
# - name: "I-84 Twin Falls"
|
# - name: "I-84 Twin Falls"
|
||||||
# lat: 42.56
|
# lat: 42.56
|
||||||
# lon: -114.47
|
# lon: -114.47
|
||||||
|
|
||||||
# 511 Road Conditions (state-specific, configurable base URL)
|
# 511 Road Conditions (state-specific, configurable base URL)
|
||||||
roads511:
|
roads511:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 300
|
tick_seconds: 300
|
||||||
api_key: ""
|
api_key: ""
|
||||||
base_url: "" # e.g. "https://511.idaho.gov/api/v2"
|
base_url: "" # e.g. "https://511.idaho.gov/api/v2"
|
||||||
endpoints: ["/get/event"]
|
endpoints: ["/get/event"]
|
||||||
bbox: [] # [west, south, east, north]
|
bbox: [] # [west, south, east, north]
|
||||||
|
|
||||||
# NASA FIRMS Satellite Fire Detection
|
# NASA FIRMS Satellite Fire Detection
|
||||||
# Early warning via satellite hotspots, hours before official perimeters
|
# Early warning via satellite hotspots, hours before official perimeters
|
||||||
# Get MAP_KEY at: https://firms.modaps.eosdis.nasa.gov/api/area/
|
# Get MAP_KEY at: https://firms.modaps.eosdis.nasa.gov/api/area/
|
||||||
firms:
|
firms:
|
||||||
enabled: false
|
enabled: false
|
||||||
tick_seconds: 1800 # 30 min default
|
tick_seconds: 1800 # 30 min default
|
||||||
map_key: "" # Required - NASA FIRMS MAP_KEY
|
map_key: "" # Required - NASA FIRMS MAP_KEY
|
||||||
source: "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT
|
source: "VIIRS_SNPP_NRT" # VIIRS_SNPP_NRT, VIIRS_NOAA20_NRT, MODIS_NRT
|
||||||
bbox: [] # [west, south, east, north] - Required
|
bbox: [] # [west, south, east, north] - Required
|
||||||
day_range: 1 # 1-10 days of data
|
day_range: 1 # 1-10 days of data
|
||||||
confidence_min: "nominal" # low, nominal, high
|
confidence_min: "nominal" # low, nominal, high
|
||||||
proximity_km: 10.0 # km to match known fire perimeters
|
proximity_km: 10.0 # km to match known fire perimeters
|
||||||
|
|
||||||
|
|
||||||
# === NOTIFICATION DELIVERY (TRANSITIONAL) ===
|
# === NOTIFICATION DELIVERY (TRANSITIONAL) ===
|
||||||
# NOTE: This notifications schema will be replaced in v0.3 by the 8-toggle model.
|
# NOTE: This notifications schema will be replaced in v0.3 by the 8-toggle model.
|
||||||
# These rule examples are transitional until Phase 1.2 lands. Do not extend.
|
# These rule examples are transitional until Phase 1.2 lands. Do not extend.
|
||||||
# Severity levels: routine (informational), priority (needs attention), immediate (act now)
|
# Severity levels: routine (informational), priority (needs attention), immediate (act now)
|
||||||
#
|
#
|
||||||
# Route alerts to channels (mesh, email, webhook) based on rules.
|
# Route alerts to channels (mesh, email, webhook) based on rules.
|
||||||
# Categories match alert types from alert_engine.py.
|
# Categories match alert types from alert_engine.py.
|
||||||
notifications:
|
notifications:
|
||||||
enabled: false
|
enabled: false
|
||||||
quiet_hours_enabled: true # Master toggle for quiet hours feature
|
quiet_hours_enabled: true # Master toggle for quiet hours feature
|
||||||
quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours
|
quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours
|
||||||
quiet_hours_end: "06:00"
|
quiet_hours_end: "06:00"
|
||||||
|
|
||||||
# Notification rules - each rule is self-contained with its own delivery config
|
# Digest scheduler settings
|
||||||
# Default baseline rules are created on fresh install
|
# The digest collects priority/routine events and delivers a summary
|
||||||
rules:
|
# at the configured time to rules with trigger_type='schedule' and
|
||||||
# Emergency Broadcast - all emergencies go out immediately
|
# schedule_match='digest'.
|
||||||
- name: "Emergency Broadcast"
|
digest:
|
||||||
enabled: true
|
schedule: "07:00" # HH:MM local time to fire digest
|
||||||
trigger_type: condition
|
include: [] # Toggle names to include (empty = default set)
|
||||||
categories: [] # Empty = all categories
|
# Default set: weather, fire, seismic, avalanche, roads, mesh_health, tracking, other
|
||||||
min_severity: "immediate"
|
# Excludes rf_propagation by default
|
||||||
delivery_type: mesh_broadcast
|
# Example: include: ["weather", "fire", "mesh_health"]
|
||||||
broadcast_channel: 0
|
|
||||||
cooldown_minutes: 5
|
# Notification rules - each rule is self-contained with its own delivery config
|
||||||
override_quiet: true # Send even during quiet hours
|
# Default baseline rules are created on fresh install
|
||||||
|
rules:
|
||||||
# Infrastructure Down - critical node and infrastructure offline alerts
|
# Emergency Broadcast - all emergencies go out immediately
|
||||||
- name: "Infrastructure Down"
|
- name: "Emergency Broadcast"
|
||||||
enabled: true
|
enabled: true
|
||||||
trigger_type: condition
|
trigger_type: condition
|
||||||
categories: ["infra_offline", "critical_node_down"]
|
categories: [] # Empty = all categories
|
||||||
min_severity: "priority"
|
min_severity: "immediate"
|
||||||
delivery_type: mesh_broadcast
|
delivery_type: mesh_broadcast
|
||||||
broadcast_channel: 0
|
broadcast_channel: 0
|
||||||
cooldown_minutes: 30
|
cooldown_minutes: 5
|
||||||
override_quiet: false
|
override_quiet: true # Send even during quiet hours
|
||||||
|
|
||||||
# Fire Alert - wildfire proximity and new ignition
|
# Infrastructure Down - critical node and infrastructure offline alerts
|
||||||
- name: "Fire Alert"
|
- name: "Infrastructure Down"
|
||||||
enabled: true
|
enabled: true
|
||||||
trigger_type: condition
|
trigger_type: condition
|
||||||
categories: ["wildfire_proximity", "new_ignition"]
|
categories: ["infra_offline", "critical_node_down"]
|
||||||
min_severity: "routine"
|
min_severity: "priority"
|
||||||
delivery_type: mesh_broadcast
|
delivery_type: mesh_broadcast
|
||||||
broadcast_channel: 0
|
broadcast_channel: 0
|
||||||
cooldown_minutes: 60
|
cooldown_minutes: 30
|
||||||
override_quiet: false
|
override_quiet: false
|
||||||
|
|
||||||
# Severe Weather - weather warnings
|
# Fire Alert - wildfire proximity and new ignition
|
||||||
- name: "Severe Weather"
|
- name: "Fire Alert"
|
||||||
enabled: true
|
enabled: true
|
||||||
trigger_type: condition
|
trigger_type: condition
|
||||||
categories: ["weather_warning"]
|
categories: ["wildfire_proximity", "new_ignition"]
|
||||||
min_severity: "priority"
|
min_severity: "routine"
|
||||||
delivery_type: mesh_broadcast
|
delivery_type: mesh_broadcast
|
||||||
broadcast_channel: 0
|
broadcast_channel: 0
|
||||||
cooldown_minutes: 30
|
cooldown_minutes: 60
|
||||||
override_quiet: false
|
override_quiet: false
|
||||||
|
|
||||||
# Example: Fire alerts -> email
|
# Severe Weather - weather warnings
|
||||||
# - name: "Fire Alerts Email"
|
- name: "Severe Weather"
|
||||||
# enabled: true
|
enabled: true
|
||||||
# trigger_type: condition
|
trigger_type: condition
|
||||||
# categories: ["wildfire_proximity", "new_ignition"]
|
categories: ["weather_warning"]
|
||||||
# min_severity: "routine"
|
min_severity: "priority"
|
||||||
# delivery_type: email
|
delivery_type: mesh_broadcast
|
||||||
# smtp_host: "smtp.gmail.com"
|
broadcast_channel: 0
|
||||||
# smtp_port: 587
|
cooldown_minutes: 30
|
||||||
# smtp_user: "you@gmail.com"
|
override_quiet: false
|
||||||
# smtp_password: "${SMTP_PASSWORD}"
|
|
||||||
# smtp_tls: true
|
# Example: Morning Digest -> mesh broadcast
|
||||||
# from_address: "meshai@yourdomain.com"
|
# Delivers the accumulated digest at the configured schedule time
|
||||||
# recipients: ["admin@yourdomain.com"]
|
# - name: "Morning Digest Mesh"
|
||||||
# cooldown_minutes: 30
|
# enabled: false
|
||||||
|
# trigger_type: schedule
|
||||||
# Example: All warnings -> Discord webhook
|
# schedule_match: "digest" # Required for digest delivery
|
||||||
# - name: "Discord Alerts"
|
# delivery_type: mesh_broadcast
|
||||||
# enabled: true
|
# broadcast_channel: 0
|
||||||
# trigger_type: condition
|
|
||||||
# categories: []
|
# Example: Morning Digest -> email
|
||||||
# min_severity: "priority"
|
# - name: "Morning Digest Email"
|
||||||
# delivery_type: webhook
|
# enabled: false
|
||||||
# webhook_url: "https://discord.com/api/webhooks/..."
|
# trigger_type: schedule
|
||||||
# cooldown_minutes: 10
|
# schedule_match: "digest"
|
||||||
|
# delivery_type: email
|
||||||
# Example: Daily health report -> mesh broadcast
|
# smtp_host: "smtp.gmail.com"
|
||||||
# - name: "Morning Briefing"
|
# smtp_port: 587
|
||||||
# enabled: true
|
# smtp_user: "you@gmail.com"
|
||||||
# trigger_type: schedule
|
# smtp_password: "${SMTP_PASSWORD}"
|
||||||
# schedule_frequency: daily
|
# smtp_tls: true
|
||||||
# schedule_time: "07:00"
|
# from_address: "meshai@yourdomain.com"
|
||||||
# message_type: mesh_health_summary
|
# recipients: ["admin@yourdomain.com"]
|
||||||
# delivery_type: mesh_broadcast
|
|
||||||
# broadcast_channel: 0
|
# Example: Fire alerts -> email
|
||||||
|
# - name: "Fire Alerts Email"
|
||||||
# Example: Rule with no delivery (matches and logs, but doesn't send)
|
# enabled: true
|
||||||
# - name: "Monitor Only"
|
# trigger_type: condition
|
||||||
# enabled: true
|
# categories: ["wildfire_proximity", "new_ignition"]
|
||||||
# trigger_type: condition
|
# min_severity: "routine"
|
||||||
# categories: ["battery_warning"]
|
# delivery_type: email
|
||||||
# min_severity: "priority"
|
# smtp_host: "smtp.gmail.com"
|
||||||
# delivery_type: "" # Empty = no delivery, just tracks matches
|
# smtp_port: 587
|
||||||
|
# smtp_user: "you@gmail.com"
|
||||||
# === WEB DASHBOARD ===
|
# smtp_password: "${SMTP_PASSWORD}"
|
||||||
dashboard:
|
# smtp_tls: true
|
||||||
enabled: true
|
# from_address: "meshai@yourdomain.com"
|
||||||
port: 8080
|
# recipients: ["admin@yourdomain.com"]
|
||||||
host: "0.0.0.0"
|
# cooldown_minutes: 30
|
||||||
|
|
||||||
|
# Example: All warnings -> Discord webhook
|
||||||
|
# - name: "Discord Alerts"
|
||||||
|
# enabled: true
|
||||||
|
# trigger_type: condition
|
||||||
|
# categories: []
|
||||||
|
# min_severity: "priority"
|
||||||
|
# delivery_type: webhook
|
||||||
|
# webhook_url: "https://discord.com/api/webhooks/..."
|
||||||
|
# cooldown_minutes: 10
|
||||||
|
|
||||||
|
# Example: Rule with no delivery (matches and logs, but doesn't send)
|
||||||
|
# - name: "Monitor Only"
|
||||||
|
# enabled: true
|
||||||
|
# trigger_type: condition
|
||||||
|
# categories: ["battery_warning"]
|
||||||
|
# min_severity: "priority"
|
||||||
|
# delivery_type: "" # Empty = no delivery, just tracks matches
|
||||||
|
|
||||||
|
# === WEB DASHBOARD ===
|
||||||
|
dashboard:
|
||||||
|
enabled: true
|
||||||
|
port: 8080
|
||||||
|
host: "0.0.0.0"
|
||||||
|
|
|
||||||
1510
meshai/config.py
1510
meshai/config.py
File diff suppressed because it is too large
Load diff
|
|
@ -1,17 +1,23 @@
|
||||||
"""Notification pipeline package.
|
"""Notification pipeline package.
|
||||||
|
|
||||||
Phase 2.1 + 2.2 + 2.3a:
|
Phase 2.1 + 2.2 + 2.3a + 2.3b:
|
||||||
- EventBus: pub/sub ingress
|
- EventBus: pub/sub ingress
|
||||||
- Inhibitor: suppresses redundant events by inhibit_keys
|
- Inhibitor: suppresses redundant events by inhibit_keys
|
||||||
- Grouper: coalesces events sharing group_key within a window
|
- Grouper: coalesces events sharing group_key within a window
|
||||||
- SeverityRouter: forks immediate vs digest
|
- SeverityRouter: forks immediate vs digest
|
||||||
- Dispatcher: routes immediate via channels (existing rules schema)
|
- Dispatcher: routes immediate via channels (existing rules schema)
|
||||||
- DigestAccumulator: tracks priority/routine events for periodic digest
|
- DigestAccumulator: tracks priority/routine events for periodic digest
|
||||||
|
- DigestScheduler: fires digest at configured time (Phase 2.3b)
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
from meshai.notifications.pipeline import build_pipeline
|
from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline
|
||||||
bus = build_pipeline(config)
|
bus = build_pipeline(config)
|
||||||
bus.emit(event)
|
bus.emit(event)
|
||||||
|
|
||||||
|
# Async lifecycle
|
||||||
|
scheduler = await start_pipeline(bus, config)
|
||||||
|
...
|
||||||
|
await stop_pipeline(scheduler)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from meshai.notifications.channels import create_channel
|
from meshai.notifications.channels import create_channel
|
||||||
|
|
@ -24,13 +30,26 @@ from meshai.notifications.pipeline.dispatcher import Dispatcher
|
||||||
from meshai.notifications.pipeline.inhibitor import Inhibitor
|
from meshai.notifications.pipeline.inhibitor import Inhibitor
|
||||||
from meshai.notifications.pipeline.grouper import Grouper
|
from meshai.notifications.pipeline.grouper import Grouper
|
||||||
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
|
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
|
||||||
|
from meshai.notifications.pipeline.scheduler import DigestScheduler
|
||||||
|
|
||||||
|
|
||||||
def build_pipeline(config) -> EventBus:
|
def build_pipeline(config) -> EventBus:
|
||||||
"""Build the pipeline and return the EventBus."""
|
"""Build the pipeline and return the EventBus.
|
||||||
|
|
||||||
|
Components are stashed on bus._pipeline_components for lifecycle use.
|
||||||
|
"""
|
||||||
bus = EventBus()
|
bus = EventBus()
|
||||||
dispatcher = Dispatcher(config, create_channel)
|
dispatcher = Dispatcher(config, create_channel)
|
||||||
digest = DigestAccumulator()
|
|
||||||
|
# Build include_toggles from config
|
||||||
|
digest_cfg = getattr(config.notifications, "digest", None)
|
||||||
|
include_toggles = None
|
||||||
|
if digest_cfg is not None:
|
||||||
|
include_list = getattr(digest_cfg, "include", None)
|
||||||
|
if include_list:
|
||||||
|
include_toggles = list(include_list)
|
||||||
|
|
||||||
|
digest = DigestAccumulator(include_toggles=include_toggles)
|
||||||
severity_router = SeverityRouter(
|
severity_router = SeverityRouter(
|
||||||
immediate_handler=dispatcher.dispatch,
|
immediate_handler=dispatcher.dispatch,
|
||||||
digest_handler=digest.enqueue,
|
digest_handler=digest.enqueue,
|
||||||
|
|
@ -38,6 +57,16 @@ def build_pipeline(config) -> EventBus:
|
||||||
grouper = Grouper(next_handler=severity_router.handle)
|
grouper = Grouper(next_handler=severity_router.handle)
|
||||||
inhibitor = Inhibitor(next_handler=grouper.handle)
|
inhibitor = Inhibitor(next_handler=grouper.handle)
|
||||||
bus.subscribe(inhibitor.handle)
|
bus.subscribe(inhibitor.handle)
|
||||||
|
|
||||||
|
# Stash components for lifecycle management
|
||||||
|
bus._pipeline_components = {
|
||||||
|
"inhibitor": inhibitor,
|
||||||
|
"grouper": grouper,
|
||||||
|
"severity_router": severity_router,
|
||||||
|
"dispatcher": dispatcher,
|
||||||
|
"digest": digest,
|
||||||
|
}
|
||||||
|
|
||||||
return bus
|
return bus
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -48,7 +77,16 @@ def build_pipeline_components(config) -> tuple:
|
||||||
"""
|
"""
|
||||||
bus = EventBus()
|
bus = EventBus()
|
||||||
dispatcher = Dispatcher(config, create_channel)
|
dispatcher = Dispatcher(config, create_channel)
|
||||||
digest = DigestAccumulator()
|
|
||||||
|
# Build include_toggles from config
|
||||||
|
digest_cfg = getattr(config.notifications, "digest", None)
|
||||||
|
include_toggles = None
|
||||||
|
if digest_cfg is not None:
|
||||||
|
include_list = getattr(digest_cfg, "include", None)
|
||||||
|
if include_list:
|
||||||
|
include_toggles = list(include_list)
|
||||||
|
|
||||||
|
digest = DigestAccumulator(include_toggles=include_toggles)
|
||||||
severity_router = SeverityRouter(
|
severity_router = SeverityRouter(
|
||||||
immediate_handler=dispatcher.dispatch,
|
immediate_handler=dispatcher.dispatch,
|
||||||
digest_handler=digest.enqueue,
|
digest_handler=digest.enqueue,
|
||||||
|
|
@ -59,6 +97,45 @@ def build_pipeline_components(config) -> tuple:
|
||||||
return bus, inhibitor, grouper, severity_router, dispatcher, digest
|
return bus, inhibitor, grouper, severity_router, dispatcher, digest
|
||||||
|
|
||||||
|
|
||||||
|
async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
|
||||||
|
"""Start the pipeline's async components (scheduler).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bus: EventBus returned by build_pipeline()
|
||||||
|
config: Config object with notifications.digest settings
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DigestScheduler instance (running). Call stop_pipeline() to stop.
|
||||||
|
"""
|
||||||
|
components = getattr(bus, "_pipeline_components", None)
|
||||||
|
if components is None:
|
||||||
|
raise RuntimeError("bus missing _pipeline_components; use build_pipeline()")
|
||||||
|
|
||||||
|
digest = components["digest"]
|
||||||
|
|
||||||
|
scheduler = DigestScheduler(
|
||||||
|
accumulator=digest,
|
||||||
|
config=config,
|
||||||
|
channel_factory=create_channel,
|
||||||
|
)
|
||||||
|
await scheduler.start()
|
||||||
|
|
||||||
|
# Stash scheduler for stop_pipeline
|
||||||
|
bus._pipeline_scheduler = scheduler
|
||||||
|
|
||||||
|
return scheduler
|
||||||
|
|
||||||
|
|
||||||
|
async def stop_pipeline(scheduler: DigestScheduler) -> None:
|
||||||
|
"""Stop the pipeline's async components.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
scheduler: DigestScheduler returned by start_pipeline()
|
||||||
|
"""
|
||||||
|
if scheduler is not None:
|
||||||
|
await scheduler.stop()
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"EventBus",
|
"EventBus",
|
||||||
"SeverityRouter",
|
"SeverityRouter",
|
||||||
|
|
@ -68,7 +145,10 @@ __all__ = [
|
||||||
"Grouper",
|
"Grouper",
|
||||||
"DigestAccumulator",
|
"DigestAccumulator",
|
||||||
"Digest",
|
"Digest",
|
||||||
|
"DigestScheduler",
|
||||||
"build_pipeline",
|
"build_pipeline",
|
||||||
"build_pipeline_components",
|
"build_pipeline_components",
|
||||||
|
"start_pipeline",
|
||||||
|
"stop_pipeline",
|
||||||
"get_bus",
|
"get_bus",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
213
meshai/notifications/pipeline/scheduler.py
Normal file
213
meshai/notifications/pipeline/scheduler.py
Normal file
|
|
@ -0,0 +1,213 @@
|
||||||
|
"""Digest scheduler — fires the digest at a configured time of day.
|
||||||
|
|
||||||
|
Reads schedule and channel routing from config; calls
|
||||||
|
accumulator.render_digest() at the scheduled time; delivers the
|
||||||
|
result to all rules matching trigger_type=='schedule' and
|
||||||
|
schedule_match=='digest'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Callable, Optional
|
||||||
|
|
||||||
|
from meshai.notifications.pipeline.digest import DigestAccumulator
|
||||||
|
|
||||||
|
|
||||||
|
class DigestScheduler:
|
||||||
|
"""Fires digest at configured time and routes to matching channels."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
accumulator: DigestAccumulator,
|
||||||
|
config,
|
||||||
|
channel_factory: Callable,
|
||||||
|
clock: Optional[Callable[[], float]] = None,
|
||||||
|
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
|
||||||
|
):
|
||||||
|
self._accumulator = accumulator
|
||||||
|
self._config = config
|
||||||
|
self._channel_factory = channel_factory
|
||||||
|
self._clock = clock or time.time
|
||||||
|
self._sleep = sleep or asyncio.sleep
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._stop_event: Optional[asyncio.Event] = None
|
||||||
|
self._last_fire_at: float = 0.0
|
||||||
|
self._logger = logging.getLogger("meshai.pipeline.scheduler")
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Begin the scheduler loop as an asyncio task."""
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
raise RuntimeError("Scheduler already running")
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
self._task = asyncio.create_task(self._run(), name="digest-scheduler")
|
||||||
|
self._logger.info(
|
||||||
|
f"Digest scheduler started, schedule={self._schedule_str()!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Signal stop and wait for the task to finish."""
|
||||||
|
if self._task is None:
|
||||||
|
return
|
||||||
|
if self._stop_event:
|
||||||
|
self._stop_event.set()
|
||||||
|
self._task.cancel()
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except (asyncio.CancelledError, Exception):
|
||||||
|
# Cancellation is expected; other exceptions already logged
|
||||||
|
pass
|
||||||
|
self._task = None
|
||||||
|
self._logger.info("Digest scheduler stopped")
|
||||||
|
|
||||||
|
async def _run(self) -> None:
|
||||||
|
"""Main loop: sleep until next fire, fire, repeat."""
|
||||||
|
try:
|
||||||
|
while self._stop_event and not self._stop_event.is_set():
|
||||||
|
now = self._clock()
|
||||||
|
next_fire = self._next_fire_at(now)
|
||||||
|
delay = max(0.0, next_fire - now)
|
||||||
|
self._logger.info(
|
||||||
|
f"Next digest at {datetime.fromtimestamp(next_fire):%Y-%m-%d %H:%M}, "
|
||||||
|
f"sleeping {delay:.0f}s"
|
||||||
|
)
|
||||||
|
# Interruptible sleep — wakes early if stop() is called
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
self._stop_event.wait(),
|
||||||
|
timeout=delay,
|
||||||
|
)
|
||||||
|
# If we got here without timeout, stop was requested
|
||||||
|
return
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass # Timeout fired = digest time arrived
|
||||||
|
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await self._fire(self._clock())
|
||||||
|
except Exception:
|
||||||
|
self._logger.exception("Digest fire failed; will retry next cycle")
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
self._logger.exception("Scheduler loop crashed unexpectedly")
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def _fire(self, now: float) -> None:
|
||||||
|
"""Render and deliver one digest."""
|
||||||
|
self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}")
|
||||||
|
digest = self._accumulator.render_digest(now)
|
||||||
|
self._last_fire_at = now
|
||||||
|
|
||||||
|
rules = self._matching_rules()
|
||||||
|
if not rules:
|
||||||
|
self._logger.warning(
|
||||||
|
"No digest delivery rules configured (need rules with "
|
||||||
|
"trigger_type=='schedule' and schedule_match=='digest')"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
for rule in rules:
|
||||||
|
try:
|
||||||
|
await self._deliver_to_rule(rule, digest, now)
|
||||||
|
except Exception:
|
||||||
|
self._logger.exception(
|
||||||
|
f"Digest delivery failed for rule {rule.name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
|
||||||
|
"""Hand the rendered digest to a channel based on rule.delivery_type."""
|
||||||
|
channel = self._channel_factory(rule)
|
||||||
|
delivery_type = rule.delivery_type
|
||||||
|
|
||||||
|
if delivery_type in ("mesh_broadcast", "mesh_dm"):
|
||||||
|
# One deliver call per chunk
|
||||||
|
chunks = digest.mesh_chunks
|
||||||
|
total = len(chunks)
|
||||||
|
for i, chunk in enumerate(chunks, start=1):
|
||||||
|
payload = {
|
||||||
|
"category": "digest",
|
||||||
|
"severity": "routine",
|
||||||
|
"message": chunk,
|
||||||
|
"node_id": None,
|
||||||
|
"region": None,
|
||||||
|
"timestamp": now,
|
||||||
|
"chunk_index": i,
|
||||||
|
"chunk_total": total,
|
||||||
|
}
|
||||||
|
channel.deliver(payload)
|
||||||
|
self._logger.info(
|
||||||
|
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Single full-form delivery
|
||||||
|
payload = {
|
||||||
|
"category": "digest",
|
||||||
|
"severity": "routine",
|
||||||
|
"message": digest.full,
|
||||||
|
"node_id": None,
|
||||||
|
"region": None,
|
||||||
|
"timestamp": now,
|
||||||
|
}
|
||||||
|
channel.deliver(payload)
|
||||||
|
self._logger.info(
|
||||||
|
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _matching_rules(self) -> list:
|
||||||
|
"""Find enabled schedule rules tagged as digest deliveries."""
|
||||||
|
matches = []
|
||||||
|
for rule in self._config.notifications.rules:
|
||||||
|
if not rule.enabled:
|
||||||
|
continue
|
||||||
|
if rule.trigger_type != "schedule":
|
||||||
|
continue
|
||||||
|
# schedule_match is the discriminator. Operators set it to
|
||||||
|
# "digest" to receive the morning digest. Other values
|
||||||
|
# reserved for future schedule types.
|
||||||
|
schedule_match = getattr(rule, "schedule_match", None)
|
||||||
|
if schedule_match != "digest":
|
||||||
|
continue
|
||||||
|
matches.append(rule)
|
||||||
|
return matches
|
||||||
|
|
||||||
|
def _next_fire_at(self, now: float) -> float:
|
||||||
|
"""Compute the next epoch timestamp when the digest should fire.
|
||||||
|
|
||||||
|
Reads schedule HH:MM from config. If today's fire time has
|
||||||
|
already passed, returns tomorrow's. Uses local timezone.
|
||||||
|
"""
|
||||||
|
schedule_str = self._schedule_str()
|
||||||
|
h, m = self._parse_schedule(schedule_str)
|
||||||
|
now_dt = datetime.fromtimestamp(now)
|
||||||
|
target_today = now_dt.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||||
|
if target_today.timestamp() <= now:
|
||||||
|
target = target_today + timedelta(days=1)
|
||||||
|
else:
|
||||||
|
target = target_today
|
||||||
|
return target.timestamp()
|
||||||
|
|
||||||
|
def _schedule_str(self) -> str:
|
||||||
|
digest_cfg = getattr(self._config.notifications, "digest", None)
|
||||||
|
if digest_cfg is None:
|
||||||
|
return "07:00"
|
||||||
|
return getattr(digest_cfg, "schedule", "07:00")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_schedule(s: str) -> tuple[int, int]:
|
||||||
|
"""Parse 'HH:MM' to (hour, minute). Falls back to 07:00 on bad input."""
|
||||||
|
try:
|
||||||
|
hh, mm = s.strip().split(":", 1)
|
||||||
|
h = int(hh)
|
||||||
|
m = int(mm)
|
||||||
|
if not (0 <= h <= 23 and 0 <= m <= 59):
|
||||||
|
raise ValueError(f"out of range: {s}")
|
||||||
|
return h, m
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
# Fall back to 07:00 rather than crash the loop
|
||||||
|
return 7, 0
|
||||||
|
|
||||||
|
def last_fire_at(self) -> float:
|
||||||
|
return self._last_fire_at
|
||||||
587
tests/test_pipeline_scheduler.py
Normal file
587
tests/test_pipeline_scheduler.py
Normal file
|
|
@ -0,0 +1,587 @@
|
||||||
|
"""Tests for DigestScheduler (Phase 2.3b).
|
||||||
|
|
||||||
|
Uses asyncio.run() since pytest-asyncio is not available in the container.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Optional
|
||||||
|
from unittest.mock import MagicMock, call
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from meshai.notifications.events import make_event
|
||||||
|
from meshai.notifications.pipeline.digest import DigestAccumulator
|
||||||
|
from meshai.notifications.pipeline.scheduler import DigestScheduler
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Test Fixtures ----
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockRule:
|
||||||
|
"""Mock notification rule for testing."""
|
||||||
|
name: str = "test-rule"
|
||||||
|
enabled: bool = True
|
||||||
|
trigger_type: str = "schedule"
|
||||||
|
schedule_match: str = "digest"
|
||||||
|
delivery_type: str = "mesh_broadcast"
|
||||||
|
broadcast_channel: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockDigestConfig:
|
||||||
|
"""Mock digest config."""
|
||||||
|
schedule: str = "07:00"
|
||||||
|
include: list = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockNotificationsConfig:
|
||||||
|
"""Mock notifications config."""
|
||||||
|
enabled: bool = True
|
||||||
|
digest: MockDigestConfig = field(default_factory=MockDigestConfig)
|
||||||
|
rules: list = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockConfig:
|
||||||
|
"""Mock config for scheduler tests."""
|
||||||
|
notifications: MockNotificationsConfig = field(default_factory=MockNotificationsConfig)
|
||||||
|
|
||||||
|
|
||||||
|
class MockChannel:
|
||||||
|
"""Mock channel that records deliveries."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.deliveries = []
|
||||||
|
|
||||||
|
def deliver(self, payload: dict):
|
||||||
|
self.deliveries.append(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def make_scheduler(
|
||||||
|
schedule: str = "07:00",
|
||||||
|
rules: Optional[list] = None,
|
||||||
|
clock: Optional[callable] = None,
|
||||||
|
sleep: Optional[callable] = None,
|
||||||
|
accumulator: Optional[DigestAccumulator] = None,
|
||||||
|
) -> tuple[DigestScheduler, MockConfig, dict]:
|
||||||
|
"""Factory for creating test schedulers.
|
||||||
|
|
||||||
|
Returns (scheduler, config, channels_by_rule_name).
|
||||||
|
"""
|
||||||
|
if rules is None:
|
||||||
|
rules = [MockRule()]
|
||||||
|
|
||||||
|
config = MockConfig(
|
||||||
|
notifications=MockNotificationsConfig(
|
||||||
|
digest=MockDigestConfig(schedule=schedule),
|
||||||
|
rules=rules,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
channels = {}
|
||||||
|
|
||||||
|
def channel_factory(rule):
|
||||||
|
ch = MockChannel()
|
||||||
|
channels[rule.name] = ch
|
||||||
|
return ch
|
||||||
|
|
||||||
|
if accumulator is None:
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
|
||||||
|
scheduler = DigestScheduler(
|
||||||
|
accumulator=accumulator,
|
||||||
|
config=config,
|
||||||
|
channel_factory=channel_factory,
|
||||||
|
clock=clock,
|
||||||
|
sleep=sleep,
|
||||||
|
)
|
||||||
|
|
||||||
|
return scheduler, config, channels
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Schedule Computation Tests ----
|
||||||
|
|
||||||
|
class TestScheduleComputation:
|
||||||
|
"""Tests for _next_fire_at and _parse_schedule."""
|
||||||
|
|
||||||
|
def test_parse_schedule_valid(self):
|
||||||
|
"""Valid HH:MM parses correctly."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
assert scheduler._parse_schedule("07:00") == (7, 0)
|
||||||
|
assert scheduler._parse_schedule("23:59") == (23, 59)
|
||||||
|
assert scheduler._parse_schedule("00:00") == (0, 0)
|
||||||
|
assert scheduler._parse_schedule("12:30") == (12, 30)
|
||||||
|
|
||||||
|
def test_parse_schedule_with_whitespace(self):
|
||||||
|
"""Whitespace is stripped."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
assert scheduler._parse_schedule(" 07:00 ") == (7, 0)
|
||||||
|
|
||||||
|
def test_parse_schedule_invalid_falls_back(self):
|
||||||
|
"""Invalid schedules fall back to 07:00."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
# Bad format
|
||||||
|
assert scheduler._parse_schedule("7:00:00") == (7, 0)
|
||||||
|
assert scheduler._parse_schedule("invalid") == (7, 0)
|
||||||
|
assert scheduler._parse_schedule("") == (7, 0)
|
||||||
|
# Out of range
|
||||||
|
assert scheduler._parse_schedule("25:00") == (7, 0)
|
||||||
|
assert scheduler._parse_schedule("12:60") == (7, 0)
|
||||||
|
|
||||||
|
def test_next_fire_at_future_today(self):
|
||||||
|
"""If schedule time is later today, returns today's timestamp."""
|
||||||
|
# Set clock to 06:00 on a known date
|
||||||
|
base_dt = datetime(2024, 6, 15, 6, 0, 0)
|
||||||
|
base_ts = base_dt.timestamp()
|
||||||
|
|
||||||
|
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||||
|
next_fire = scheduler._next_fire_at(base_ts)
|
||||||
|
|
||||||
|
# Should be 07:00 same day
|
||||||
|
expected_dt = datetime(2024, 6, 15, 7, 0, 0)
|
||||||
|
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||||
|
|
||||||
|
def test_next_fire_at_past_today_schedules_tomorrow(self):
|
||||||
|
"""If schedule time has passed today, returns tomorrow's timestamp."""
|
||||||
|
# Set clock to 08:00 on a known date
|
||||||
|
base_dt = datetime(2024, 6, 15, 8, 0, 0)
|
||||||
|
base_ts = base_dt.timestamp()
|
||||||
|
|
||||||
|
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||||
|
next_fire = scheduler._next_fire_at(base_ts)
|
||||||
|
|
||||||
|
# Should be 07:00 next day
|
||||||
|
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
|
||||||
|
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||||
|
|
||||||
|
def test_next_fire_at_exact_time_schedules_tomorrow(self):
|
||||||
|
"""If clock is exactly at schedule time, schedules tomorrow."""
|
||||||
|
base_dt = datetime(2024, 6, 15, 7, 0, 0)
|
||||||
|
base_ts = base_dt.timestamp()
|
||||||
|
|
||||||
|
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||||
|
next_fire = scheduler._next_fire_at(base_ts)
|
||||||
|
|
||||||
|
# Should be 07:00 next day
|
||||||
|
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
|
||||||
|
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||||
|
|
||||||
|
def test_schedule_str_reads_from_config(self):
|
||||||
|
"""_schedule_str reads from config.notifications.digest.schedule."""
|
||||||
|
scheduler, _, _ = make_scheduler(schedule="19:30")
|
||||||
|
assert scheduler._schedule_str() == "19:30"
|
||||||
|
|
||||||
|
def test_schedule_str_defaults_to_0700(self):
|
||||||
|
"""Missing digest config defaults to 07:00."""
|
||||||
|
config = MockConfig()
|
||||||
|
config.notifications.digest = None
|
||||||
|
|
||||||
|
scheduler = DigestScheduler(
|
||||||
|
accumulator=DigestAccumulator(),
|
||||||
|
config=config,
|
||||||
|
channel_factory=lambda r: MockChannel(),
|
||||||
|
)
|
||||||
|
assert scheduler._schedule_str() == "07:00"
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Fire Behavior Tests ----
|
||||||
|
|
||||||
|
class TestFireBehavior:
|
||||||
|
"""Tests for _fire() digest delivery."""
|
||||||
|
|
||||||
|
def test_fire_delivers_to_matching_rule(self):
|
||||||
|
"""_fire() delivers digest to rules with schedule_match='digest'."""
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
# Add an event so digest has content
|
||||||
|
accumulator.enqueue(make_event(
|
||||||
|
source="test",
|
||||||
|
category="weather_warning",
|
||||||
|
severity="priority",
|
||||||
|
title="Test Alert",
|
||||||
|
summary="Test alert summary",
|
||||||
|
))
|
||||||
|
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=[MockRule(name="digest-mesh")],
|
||||||
|
accumulator=accumulator,
|
||||||
|
)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(now)
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
assert "digest-mesh" in channels
|
||||||
|
ch = channels["digest-mesh"]
|
||||||
|
assert len(ch.deliveries) == 1
|
||||||
|
payload = ch.deliveries[0]
|
||||||
|
assert payload["category"] == "digest"
|
||||||
|
assert payload["severity"] == "routine"
|
||||||
|
assert "Test alert" in payload["message"] or "Weather" in payload["message"]
|
||||||
|
|
||||||
|
def test_fire_skips_disabled_rules(self):
|
||||||
|
"""Disabled rules are not delivered to."""
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=[MockRule(name="disabled", enabled=False)],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
# Channel should not be created for disabled rule
|
||||||
|
assert "disabled" not in channels
|
||||||
|
|
||||||
|
def test_fire_skips_non_schedule_rules(self):
|
||||||
|
"""Rules with trigger_type != 'schedule' are skipped."""
|
||||||
|
rule = MockRule(name="condition-rule", trigger_type="condition")
|
||||||
|
scheduler, _, channels = make_scheduler(rules=[rule])
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
assert "condition-rule" not in channels
|
||||||
|
|
||||||
|
def test_fire_skips_non_digest_schedule_rules(self):
|
||||||
|
"""Schedule rules with schedule_match != 'digest' are skipped."""
|
||||||
|
rule = MockRule(name="other-schedule", schedule_match="daily_report")
|
||||||
|
scheduler, _, channels = make_scheduler(rules=[rule])
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
assert "other-schedule" not in channels
|
||||||
|
|
||||||
|
def test_fire_mesh_delivery_chunks(self):
|
||||||
|
"""Mesh delivery types get per-chunk delivery."""
|
||||||
|
accumulator = DigestAccumulator(mesh_char_limit=100)
|
||||||
|
# Add multiple events to force chunking
|
||||||
|
for i in range(5):
|
||||||
|
accumulator.enqueue(make_event(
|
||||||
|
source="test",
|
||||||
|
category="weather_warning",
|
||||||
|
severity="priority",
|
||||||
|
title=f"Alert {i}",
|
||||||
|
summary=f"Weather alert number {i} with enough text to use space",
|
||||||
|
))
|
||||||
|
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=[MockRule(name="mesh", delivery_type="mesh_broadcast")],
|
||||||
|
accumulator=accumulator,
|
||||||
|
)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(now)
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
ch = channels["mesh"]
|
||||||
|
# Should have multiple deliveries (one per chunk)
|
||||||
|
assert len(ch.deliveries) >= 1
|
||||||
|
# Check chunk metadata
|
||||||
|
for payload in ch.deliveries:
|
||||||
|
assert "chunk_index" in payload
|
||||||
|
assert "chunk_total" in payload
|
||||||
|
|
||||||
|
def test_fire_email_delivery_full_text(self):
|
||||||
|
"""Email delivery type gets single full-text delivery."""
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
accumulator.enqueue(make_event(
|
||||||
|
source="test",
|
||||||
|
category="weather_warning",
|
||||||
|
severity="priority",
|
||||||
|
title="Test Alert",
|
||||||
|
summary="Test alert summary",
|
||||||
|
))
|
||||||
|
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=[MockRule(name="email", delivery_type="email")],
|
||||||
|
accumulator=accumulator,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
ch = channels["email"]
|
||||||
|
assert len(ch.deliveries) == 1
|
||||||
|
payload = ch.deliveries[0]
|
||||||
|
assert "chunk_index" not in payload
|
||||||
|
assert "--- " in payload["message"] # Full format has header
|
||||||
|
|
||||||
|
def test_fire_updates_last_fire_at(self):
|
||||||
|
"""_fire() updates last_fire_at timestamp."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
assert scheduler.last_fire_at() == 0.0
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(now)
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
assert scheduler.last_fire_at() == now
|
||||||
|
|
||||||
|
def test_fire_empty_digest_still_delivers(self):
|
||||||
|
"""Empty digest is still delivered (with 'no alerts' message)."""
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=[MockRule(name="mesh")],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
ch = channels["mesh"]
|
||||||
|
assert len(ch.deliveries) == 1
|
||||||
|
assert "No alerts" in ch.deliveries[0]["message"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Lifecycle Tests ----
|
||||||
|
|
||||||
|
class TestLifecycle:
|
||||||
|
"""Tests for start/stop lifecycle."""
|
||||||
|
|
||||||
|
def test_start_creates_task(self):
|
||||||
|
"""start() creates and runs an asyncio task."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
|
||||||
|
async def run_start():
|
||||||
|
await scheduler.start()
|
||||||
|
assert scheduler._task is not None
|
||||||
|
assert not scheduler._task.done()
|
||||||
|
await scheduler.stop()
|
||||||
|
|
||||||
|
asyncio.run(run_start())
|
||||||
|
|
||||||
|
def test_start_twice_raises(self):
|
||||||
|
"""Starting twice raises RuntimeError."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
|
||||||
|
async def run_double_start():
|
||||||
|
await scheduler.start()
|
||||||
|
try:
|
||||||
|
with pytest.raises(RuntimeError, match="already running"):
|
||||||
|
await scheduler.start()
|
||||||
|
finally:
|
||||||
|
await scheduler.stop()
|
||||||
|
|
||||||
|
asyncio.run(run_double_start())
|
||||||
|
|
||||||
|
def test_stop_cancels_task(self):
|
||||||
|
"""stop() cancels the running task."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
|
||||||
|
async def run_stop():
|
||||||
|
await scheduler.start()
|
||||||
|
task = scheduler._task
|
||||||
|
await scheduler.stop()
|
||||||
|
assert scheduler._task is None
|
||||||
|
assert task.done()
|
||||||
|
|
||||||
|
asyncio.run(run_stop())
|
||||||
|
|
||||||
|
def test_stop_idempotent(self):
|
||||||
|
"""stop() on non-running scheduler is safe."""
|
||||||
|
scheduler, _, _ = make_scheduler()
|
||||||
|
|
||||||
|
async def run_stop():
|
||||||
|
# Never started
|
||||||
|
await scheduler.stop()
|
||||||
|
# Should not raise
|
||||||
|
|
||||||
|
asyncio.run(run_stop())
|
||||||
|
|
||||||
|
def test_stop_event_interrupts_sleep(self):
|
||||||
|
"""stop() interrupts the sleep and exits cleanly."""
|
||||||
|
sleep_calls = []
|
||||||
|
|
||||||
|
async def fake_sleep(duration):
|
||||||
|
sleep_calls.append(duration)
|
||||||
|
# Actually sleep briefly so we can cancel
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
# Set clock far from schedule time to get long sleep
|
||||||
|
base_dt = datetime(2024, 6, 15, 8, 0, 0)
|
||||||
|
scheduler, _, _ = make_scheduler(
|
||||||
|
schedule="07:00",
|
||||||
|
clock=lambda: base_dt.timestamp(),
|
||||||
|
sleep=fake_sleep,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_test():
|
||||||
|
await scheduler.start()
|
||||||
|
# Give task time to enter sleep
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
await scheduler.stop()
|
||||||
|
|
||||||
|
asyncio.run(run_test())
|
||||||
|
|
||||||
|
# Task should have exited cleanly
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Integration Tests ----
|
||||||
|
|
||||||
|
class TestIntegration:
|
||||||
|
"""Integration tests with real timing (short intervals)."""
|
||||||
|
|
||||||
|
def test_scheduler_fires_on_schedule(self):
|
||||||
|
"""Scheduler fires when schedule time arrives."""
|
||||||
|
fire_times = []
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
|
||||||
|
# Start at 06:59:59.95 (50ms before 07:00), delay will be ~50ms
|
||||||
|
clock_time = [datetime(2024, 6, 15, 6, 59, 59, 950000).timestamp()]
|
||||||
|
|
||||||
|
def fake_clock():
|
||||||
|
return clock_time[0]
|
||||||
|
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
schedule="07:00",
|
||||||
|
clock=fake_clock,
|
||||||
|
accumulator=accumulator,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Track when fire happens
|
||||||
|
original_fire = scheduler._fire
|
||||||
|
|
||||||
|
async def tracking_fire(now):
|
||||||
|
fire_times.append(now)
|
||||||
|
await original_fire(now)
|
||||||
|
# After first fire, advance clock so next cycle has long delay
|
||||||
|
clock_time[0] = datetime(2024, 6, 15, 8, 0, 0).timestamp()
|
||||||
|
|
||||||
|
scheduler._fire = tracking_fire
|
||||||
|
|
||||||
|
async def run_test():
|
||||||
|
await scheduler.start()
|
||||||
|
# Wait for the ~50ms delay plus some buffer
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
await scheduler.stop()
|
||||||
|
|
||||||
|
asyncio.run(run_test())
|
||||||
|
|
||||||
|
# Should have fired once
|
||||||
|
assert len(fire_times) >= 1
|
||||||
|
|
||||||
|
def test_scheduler_multiple_rules(self):
|
||||||
|
"""Scheduler delivers to multiple matching rules."""
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
accumulator.enqueue(make_event(
|
||||||
|
source="test",
|
||||||
|
category="weather_warning",
|
||||||
|
severity="priority",
|
||||||
|
title="Test",
|
||||||
|
summary="Test summary",
|
||||||
|
))
|
||||||
|
|
||||||
|
rules = [
|
||||||
|
MockRule(name="mesh1", delivery_type="mesh_broadcast"),
|
||||||
|
MockRule(name="mesh2", delivery_type="mesh_dm"),
|
||||||
|
MockRule(name="email", delivery_type="email"),
|
||||||
|
]
|
||||||
|
|
||||||
|
scheduler, _, channels = make_scheduler(
|
||||||
|
rules=rules,
|
||||||
|
accumulator=accumulator,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
# All three should have received deliveries
|
||||||
|
assert "mesh1" in channels
|
||||||
|
assert "mesh2" in channels
|
||||||
|
assert "email" in channels
|
||||||
|
assert len(channels["mesh1"].deliveries) >= 1
|
||||||
|
assert len(channels["mesh2"].deliveries) >= 1
|
||||||
|
assert len(channels["email"].deliveries) == 1
|
||||||
|
|
||||||
|
def test_scheduler_handles_delivery_error(self):
|
||||||
|
"""Scheduler continues after delivery error."""
|
||||||
|
accumulator = DigestAccumulator()
|
||||||
|
accumulator.enqueue(make_event(
|
||||||
|
source="test",
|
||||||
|
category="weather_warning",
|
||||||
|
severity="priority",
|
||||||
|
title="Test",
|
||||||
|
summary="Test",
|
||||||
|
))
|
||||||
|
|
||||||
|
rules = [
|
||||||
|
MockRule(name="bad"),
|
||||||
|
MockRule(name="good"),
|
||||||
|
]
|
||||||
|
|
||||||
|
call_order = []
|
||||||
|
|
||||||
|
def bad_channel_factory(rule):
|
||||||
|
call_order.append(rule.name)
|
||||||
|
if rule.name == "bad":
|
||||||
|
ch = MagicMock()
|
||||||
|
ch.deliver.side_effect = RuntimeError("delivery failed")
|
||||||
|
return ch
|
||||||
|
return MockChannel()
|
||||||
|
|
||||||
|
scheduler = DigestScheduler(
|
||||||
|
accumulator=accumulator,
|
||||||
|
config=MockConfig(
|
||||||
|
notifications=MockNotificationsConfig(rules=rules)
|
||||||
|
),
|
||||||
|
channel_factory=bad_channel_factory,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_fire():
|
||||||
|
await scheduler._fire(time.time())
|
||||||
|
|
||||||
|
asyncio.run(run_fire())
|
||||||
|
|
||||||
|
# Both rules should have been attempted
|
||||||
|
assert "bad" in call_order
|
||||||
|
assert "good" in call_order
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Matching Rules Tests ----
|
||||||
|
|
||||||
|
class TestMatchingRules:
|
||||||
|
"""Tests for _matching_rules() filter logic."""
|
||||||
|
|
||||||
|
def test_matching_rules_filters_correctly(self):
|
||||||
|
"""Only enabled schedule rules with schedule_match='digest' match."""
|
||||||
|
rules = [
|
||||||
|
MockRule(name="good", enabled=True, trigger_type="schedule", schedule_match="digest"),
|
||||||
|
MockRule(name="disabled", enabled=False, trigger_type="schedule", schedule_match="digest"),
|
||||||
|
MockRule(name="condition", enabled=True, trigger_type="condition", schedule_match="digest"),
|
||||||
|
MockRule(name="other-match", enabled=True, trigger_type="schedule", schedule_match="daily"),
|
||||||
|
MockRule(name="no-match", enabled=True, trigger_type="schedule", schedule_match=None),
|
||||||
|
]
|
||||||
|
|
||||||
|
scheduler, _, _ = make_scheduler(rules=rules)
|
||||||
|
matches = scheduler._matching_rules()
|
||||||
|
|
||||||
|
assert len(matches) == 1
|
||||||
|
assert matches[0].name == "good"
|
||||||
|
|
||||||
|
def test_matching_rules_empty_when_no_rules(self):
|
||||||
|
"""Returns empty list when no rules configured."""
|
||||||
|
scheduler, _, _ = make_scheduler(rules=[])
|
||||||
|
matches = scheduler._matching_rules()
|
||||||
|
assert matches == []
|
||||||
Loading…
Add table
Add a link
Reference in a new issue