build: normalize all line endings to LF

One-time renormalization pass under the .gitattributes added in the
previous commit. Every tracked text file now uses LF. No semantic
changes — verified via git diff --cached --ignore-all-space showing
zero real differences. Future diffs will only show real content
changes.

This commit will appear huge in git log --stat but represents zero
behavior change. Use git log --follow --ignore-all-space or
git blame -w when archaeologically tracing through this commit.
This commit is contained in:
K7ZVX 2026-05-14 22:43:06 +00:00
commit d6bc6b2b89
46 changed files with 11450 additions and 11450 deletions

View file

@ -1,334 +1,334 @@
"""Alert category registry.
Defines all alertable conditions with human-readable names, descriptions,
and example messages showing what users will receive.
Severity levels (military/intelligence precedence):
routine - Informational, no time pressure
priority - Needs attention soon
immediate - Act now, drop everything
Toggle categories (for v0.3 notification routing):
mesh_health - infrastructure, power, utilization, coverage, health-score
weather - NWS-sourced alerts, stream flooding
fire - NIFC perimeters, FIRMS hotspots
rf_propagation - solar, geomagnetic, ducting, band conditions
roads - 511, TomTom traffic
avalanche - avalanche advisories
seismic - USGS quakes (Phase 3)
tracking - ADS-B, AIS, satellite passes (Phase 7)
"""
from typing import Optional
# Valid toggle values for v0.3 pipeline
VALID_TOGGLES = frozenset({
"mesh_health",
"weather",
"fire",
"rf_propagation",
"roads",
"avalanche",
"seismic",
"tracking",
})
ALERT_CATEGORIES = {
# Infrastructure alerts
"infra_offline": {
"name": "Infrastructure Node Offline",
"description": "An infrastructure node (router/repeater) stopped responding",
"default_severity": "priority",
"example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours",
"toggle": "mesh_health",
},
"critical_node_down": {
"name": "Critical Node Down",
"description": "A node you marked as critical went offline",
"default_severity": "immediate",
"example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour",
"toggle": "mesh_health",
},
"infra_recovery": {
"name": "Infrastructure Recovery",
"description": "An offline infrastructure node came back online",
"default_severity": "routine",
"example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage",
"toggle": "mesh_health",
},
"new_router": {
"name": "New Router",
"description": "A new router appeared on the mesh",
"default_severity": "routine",
"example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley",
"toggle": "mesh_health",
},
# Power alerts
"battery_warning": {
"name": "Battery Warning",
"description": "Infrastructure node battery below 30% (3.60V)",
"default_severity": "routine",
"example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging",
"toggle": "mesh_health",
},
"battery_critical": {
"name": "Battery Critical",
"description": "Infrastructure node battery below 15% (3.50V)",
"default_severity": "priority",
"example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours",
"toggle": "mesh_health",
},
"battery_emergency": {
"name": "Battery Emergency",
"description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent",
"default_severity": "immediate",
"example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent",
"toggle": "mesh_health",
},
"battery_trend": {
"name": "Battery Declining",
"description": "Battery showing declining trend over 7 days — possible solar or charging issue",
"default_severity": "routine",
"example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)",
"toggle": "mesh_health",
},
"power_source_change": {
"name": "Power Source Change",
"description": "Node switched from USB to battery — possible power outage at site",
"default_severity": "priority",
"example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage",
"toggle": "mesh_health",
},
"solar_not_charging": {
"name": "Solar Not Charging",
"description": "Solar panel not charging during daylight hours — panel issue or obstruction",
"default_severity": "priority",
"example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)",
"toggle": "mesh_health",
},
# Utilization alerts
"high_utilization": {
"name": "Channel Airtime High",
"description": "LoRa channel airtime exceeding threshold — mesh congestion",
"default_severity": "routine",
"example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.",
"toggle": "mesh_health",
},
"sustained_high_util": {
"name": "Sustained High Utilization",
"description": "Channel airtime elevated for extended period — ongoing congestion",
"default_severity": "priority",
"example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.",
"toggle": "mesh_health",
},
"packet_flood": {
"name": "Packet Flood",
"description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter",
"default_severity": "priority",
"example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?",
"toggle": "mesh_health",
},
# Coverage alerts
"infra_single_gateway": {
"name": "Single Gateway",
"description": "Infrastructure node dropped to single gateway coverage — reduced redundancy",
"default_severity": "priority",
"example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.",
"toggle": "mesh_health",
},
"feeder_offline": {
"name": "Feeder Offline",
"description": "A feeder gateway stopped responding — coverage gap possible",
"default_severity": "priority",
"example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.",
"toggle": "mesh_health",
},
"region_total_blackout": {
"name": "Region Blackout",
"description": "All infrastructure in a region is offline — complete coverage loss",
"default_severity": "immediate",
"example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!",
"toggle": "mesh_health",
},
# Health score alerts
"mesh_score_low": {
"name": "Mesh Health Low",
"description": "Overall mesh health score dropped below threshold — multiple issues likely",
"default_severity": "priority",
"example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.",
"toggle": "mesh_health",
},
"region_score_low": {
"name": "Region Health Low",
"description": "A region's health score below threshold — localized issues",
"default_severity": "priority",
"example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.",
"toggle": "mesh_health",
},
# Environmental - Weather
"weather_warning": {
"name": "Severe Weather",
"description": "NWS warning or advisory affecting your mesh area",
"default_severity": "priority",
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
"toggle": "weather",
},
# Environmental - Space Weather
"hf_blackout": {
"name": "HF Radio Blackout",
"description": "R3+ solar flare degrading HF propagation on sunlit side",
"default_severity": "priority",
"example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.",
"toggle": "rf_propagation",
},
"geomagnetic_storm": {
"name": "Geomagnetic Storm",
"description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible",
"default_severity": "priority",
"example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.",
"toggle": "rf_propagation",
},
# Environmental - Tropospheric
"tropospheric_ducting": {
"name": "Tropospheric Ducting",
"description": "Atmospheric conditions trapping VHF/UHF signals — extended range",
"default_severity": "routine",
"example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.",
"toggle": "rf_propagation",
},
# Environmental - Fire
"fire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.",
"toggle": "fire",
},
"wildfire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.",
"toggle": "fire",
},
"new_ignition": {
"name": "New Fire Ignition",
"description": "Satellite hotspot detected NOT near any known fire — potential new wildfire",
"default_severity": "priority",
"example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.",
"toggle": "fire",
},
# Environmental - Flood
"stream_flood_warning": {
"name": "Stream Flood Warning",
"description": "River gauge exceeds NWS flood stage threshold",
"default_severity": "priority",
"example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.",
"toggle": "weather",
},
"stream_high_water": {
"name": "Stream High Water",
"description": "River gauge approaching flood stage — monitoring recommended",
"default_severity": "routine",
"example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.",
"toggle": "weather",
},
# Environmental - Roads
"road_closure": {
"name": "Road Closure",
"description": "Full road closure on a monitored corridor",
"default_severity": "priority",
"example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.",
"toggle": "roads",
},
"traffic_congestion": {
"name": "Traffic Congestion",
"description": "Traffic speed dropped below congestion threshold on a monitored corridor",
"default_severity": "routine",
"example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio",
"toggle": "roads",
},
# Environmental - Avalanche
"avalanche_warning": {
"name": "Avalanche Danger High",
"description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area",
"default_severity": "priority",
"example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.",
"toggle": "avalanche",
},
"avalanche_considerable": {
"name": "Avalanche Danger Considerable",
"description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level",
"default_severity": "routine",
"example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.",
"toggle": "avalanche",
},
}
def get_category(category_id: str) -> dict:
"""Get category info by ID, with fallback for unknown categories."""
if category_id in ALERT_CATEGORIES:
return ALERT_CATEGORIES[category_id]
return {
"name": category_id.replace("_", " ").title(),
"description": f"Alert type: {category_id}",
"default_severity": "routine",
"example_message": f"Alert: {category_id}",
"toggle": "mesh_health", # Default unknown to mesh_health
}
def list_categories() -> list[dict]:
"""List all categories with their IDs."""
return [
{"id": cat_id, **cat_info}
for cat_id, cat_info in ALERT_CATEGORIES.items()
]
def categories_for_toggle(toggle: str) -> list[str]:
"""Return all category names that route to this toggle.
Args:
toggle: Toggle name (e.g., "mesh_health", "weather")
Returns:
List of category IDs that have this toggle assigned
"""
if toggle not in VALID_TOGGLES:
return []
return [
cat_id
for cat_id, cat_info in ALERT_CATEGORIES.items()
if cat_info.get("toggle") == toggle
]
def get_toggle(category_name: str) -> Optional[str]:
"""Return the toggle name for a category, or None if unknown.
Args:
category_name: Category ID (e.g., "infra_offline")
Returns:
Toggle name (e.g., "mesh_health") or None if category unknown
"""
cat_info = ALERT_CATEGORIES.get(category_name)
if cat_info:
return cat_info.get("toggle")
return None
"""Alert category registry.
Defines all alertable conditions with human-readable names, descriptions,
and example messages showing what users will receive.
Severity levels (military/intelligence precedence):
routine - Informational, no time pressure
priority - Needs attention soon
immediate - Act now, drop everything
Toggle categories (for v0.3 notification routing):
mesh_health - infrastructure, power, utilization, coverage, health-score
weather - NWS-sourced alerts, stream flooding
fire - NIFC perimeters, FIRMS hotspots
rf_propagation - solar, geomagnetic, ducting, band conditions
roads - 511, TomTom traffic
avalanche - avalanche advisories
seismic - USGS quakes (Phase 3)
tracking - ADS-B, AIS, satellite passes (Phase 7)
"""
from typing import Optional
# Valid toggle values for v0.3 pipeline
VALID_TOGGLES = frozenset({
"mesh_health",
"weather",
"fire",
"rf_propagation",
"roads",
"avalanche",
"seismic",
"tracking",
})
ALERT_CATEGORIES = {
# Infrastructure alerts
"infra_offline": {
"name": "Infrastructure Node Offline",
"description": "An infrastructure node (router/repeater) stopped responding",
"default_severity": "priority",
"example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours",
"toggle": "mesh_health",
},
"critical_node_down": {
"name": "Critical Node Down",
"description": "A node you marked as critical went offline",
"default_severity": "immediate",
"example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour",
"toggle": "mesh_health",
},
"infra_recovery": {
"name": "Infrastructure Recovery",
"description": "An offline infrastructure node came back online",
"default_severity": "routine",
"example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage",
"toggle": "mesh_health",
},
"new_router": {
"name": "New Router",
"description": "A new router appeared on the mesh",
"default_severity": "routine",
"example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley",
"toggle": "mesh_health",
},
# Power alerts
"battery_warning": {
"name": "Battery Warning",
"description": "Infrastructure node battery below 30% (3.60V)",
"default_severity": "routine",
"example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging",
"toggle": "mesh_health",
},
"battery_critical": {
"name": "Battery Critical",
"description": "Infrastructure node battery below 15% (3.50V)",
"default_severity": "priority",
"example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours",
"toggle": "mesh_health",
},
"battery_emergency": {
"name": "Battery Emergency",
"description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent",
"default_severity": "immediate",
"example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent",
"toggle": "mesh_health",
},
"battery_trend": {
"name": "Battery Declining",
"description": "Battery showing declining trend over 7 days — possible solar or charging issue",
"default_severity": "routine",
"example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)",
"toggle": "mesh_health",
},
"power_source_change": {
"name": "Power Source Change",
"description": "Node switched from USB to battery — possible power outage at site",
"default_severity": "priority",
"example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage",
"toggle": "mesh_health",
},
"solar_not_charging": {
"name": "Solar Not Charging",
"description": "Solar panel not charging during daylight hours — panel issue or obstruction",
"default_severity": "priority",
"example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)",
"toggle": "mesh_health",
},
# Utilization alerts
"high_utilization": {
"name": "Channel Airtime High",
"description": "LoRa channel airtime exceeding threshold — mesh congestion",
"default_severity": "routine",
"example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.",
"toggle": "mesh_health",
},
"sustained_high_util": {
"name": "Sustained High Utilization",
"description": "Channel airtime elevated for extended period — ongoing congestion",
"default_severity": "priority",
"example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.",
"toggle": "mesh_health",
},
"packet_flood": {
"name": "Packet Flood",
"description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter",
"default_severity": "priority",
"example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?",
"toggle": "mesh_health",
},
# Coverage alerts
"infra_single_gateway": {
"name": "Single Gateway",
"description": "Infrastructure node dropped to single gateway coverage — reduced redundancy",
"default_severity": "priority",
"example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.",
"toggle": "mesh_health",
},
"feeder_offline": {
"name": "Feeder Offline",
"description": "A feeder gateway stopped responding — coverage gap possible",
"default_severity": "priority",
"example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.",
"toggle": "mesh_health",
},
"region_total_blackout": {
"name": "Region Blackout",
"description": "All infrastructure in a region is offline — complete coverage loss",
"default_severity": "immediate",
"example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!",
"toggle": "mesh_health",
},
# Health score alerts
"mesh_score_low": {
"name": "Mesh Health Low",
"description": "Overall mesh health score dropped below threshold — multiple issues likely",
"default_severity": "priority",
"example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.",
"toggle": "mesh_health",
},
"region_score_low": {
"name": "Region Health Low",
"description": "A region's health score below threshold — localized issues",
"default_severity": "priority",
"example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.",
"toggle": "mesh_health",
},
# Environmental - Weather
"weather_warning": {
"name": "Severe Weather",
"description": "NWS warning or advisory affecting your mesh area",
"default_severity": "priority",
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
"toggle": "weather",
},
# Environmental - Space Weather
"hf_blackout": {
"name": "HF Radio Blackout",
"description": "R3+ solar flare degrading HF propagation on sunlit side",
"default_severity": "priority",
"example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.",
"toggle": "rf_propagation",
},
"geomagnetic_storm": {
"name": "Geomagnetic Storm",
"description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible",
"default_severity": "priority",
"example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.",
"toggle": "rf_propagation",
},
# Environmental - Tropospheric
"tropospheric_ducting": {
"name": "Tropospheric Ducting",
"description": "Atmospheric conditions trapping VHF/UHF signals — extended range",
"default_severity": "routine",
"example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.",
"toggle": "rf_propagation",
},
# Environmental - Fire
"fire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.",
"toggle": "fire",
},
"wildfire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.",
"toggle": "fire",
},
"new_ignition": {
"name": "New Fire Ignition",
"description": "Satellite hotspot detected NOT near any known fire — potential new wildfire",
"default_severity": "priority",
"example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.",
"toggle": "fire",
},
# Environmental - Flood
"stream_flood_warning": {
"name": "Stream Flood Warning",
"description": "River gauge exceeds NWS flood stage threshold",
"default_severity": "priority",
"example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.",
"toggle": "weather",
},
"stream_high_water": {
"name": "Stream High Water",
"description": "River gauge approaching flood stage — monitoring recommended",
"default_severity": "routine",
"example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.",
"toggle": "weather",
},
# Environmental - Roads
"road_closure": {
"name": "Road Closure",
"description": "Full road closure on a monitored corridor",
"default_severity": "priority",
"example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.",
"toggle": "roads",
},
"traffic_congestion": {
"name": "Traffic Congestion",
"description": "Traffic speed dropped below congestion threshold on a monitored corridor",
"default_severity": "routine",
"example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio",
"toggle": "roads",
},
# Environmental - Avalanche
"avalanche_warning": {
"name": "Avalanche Danger High",
"description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area",
"default_severity": "priority",
"example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.",
"toggle": "avalanche",
},
"avalanche_considerable": {
"name": "Avalanche Danger Considerable",
"description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level",
"default_severity": "routine",
"example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.",
"toggle": "avalanche",
},
}
def get_category(category_id: str) -> dict:
"""Get category info by ID, with fallback for unknown categories."""
if category_id in ALERT_CATEGORIES:
return ALERT_CATEGORIES[category_id]
return {
"name": category_id.replace("_", " ").title(),
"description": f"Alert type: {category_id}",
"default_severity": "routine",
"example_message": f"Alert: {category_id}",
"toggle": "mesh_health", # Default unknown to mesh_health
}
def list_categories() -> list[dict]:
"""List all categories with their IDs."""
return [
{"id": cat_id, **cat_info}
for cat_id, cat_info in ALERT_CATEGORIES.items()
]
def categories_for_toggle(toggle: str) -> list[str]:
"""Return all category names that route to this toggle.
Args:
toggle: Toggle name (e.g., "mesh_health", "weather")
Returns:
List of category IDs that have this toggle assigned
"""
if toggle not in VALID_TOGGLES:
return []
return [
cat_id
for cat_id, cat_info in ALERT_CATEGORIES.items()
if cat_info.get("toggle") == toggle
]
def get_toggle(category_name: str) -> Optional[str]:
"""Return the toggle name for a category, or None if unknown.
Args:
category_name: Category ID (e.g., "infra_offline")
Returns:
Toggle name (e.g., "mesh_health") or None if category unknown
"""
cat_info = ALERT_CATEGORIES.get(category_name)
if cat_info:
return cat_info.get("toggle")
return None

View file

@ -1,186 +1,186 @@
"""Event dataclass for the v0.3 notification pipeline.
This module defines the unified Event shape that flows through the
notification routing pipeline. All adapters emit Events, and the
router consumes them.
Usage:
from meshai.notifications.events import Event, make_event
# Create an event
event = make_event(
source="nws",
category="tornado_warning",
severity="immediate",
title="Tornado Warning for Ada County",
summary="A tornado warning has been issued...",
lat=43.615,
lon=-116.2023,
)
# Serialize for storage/webhook
data = event.to_dict()
# Restore from storage
event2 = Event.from_dict(data)
"""
import hashlib
import time
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
# Valid severity levels
SEVERITY_LEVELS = frozenset({"routine", "priority", "immediate"})
@dataclass
class Event:
"""Unified event shape for the notification pipeline.
All adapters (NWS, FIRMS, alert_engine, etc.) emit Events.
The router consumes Events and dispatches them to channels.
"""
# Identity
id: str = "" # stable hash for dedup, computed if not provided
source: str = "" # adapter name: "nws", "firms", "alert_engine", etc.
category: str = "" # specific event type within source
# Severity
severity: str = "routine" # "routine" | "priority" | "immediate"
# Geography
region: Optional[str] = None # primary region name, set by region tagger
regions: list[str] = field(default_factory=list) # all regions touched
lat: Optional[float] = None
lon: Optional[float] = None
nws_zones: list[str] = field(default_factory=list) # NWS zone codes
# Content
title: str = "" # one-line summary for digest headers
summary: str = "" # 1-3 sentence summary for immediate/mesh delivery
body: str = "" # full content for email/webhook delivery
# Affected entities (for mesh health events)
node_ids: list[str] = field(default_factory=list)
short_names: list[str] = field(default_factory=list)
# Timing
timestamp: float = 0.0 # event creation time
effective: Optional[float] = None # event start (NWS-style)
expires: Optional[float] = None # event end (NWS-style)
# Routing hints
group_key: Optional[str] = None # events with same key get merged
inhibit_keys: list[str] = field(default_factory=list) # suppression keys
# Raw adapter data (preserved for advanced rendering)
data: dict = field(default_factory=dict)
@staticmethod
def compute_id(
source: str,
category: str,
group_key: Optional[str] = None,
lat: Optional[float] = None,
lon: Optional[float] = None,
) -> str:
"""Compute a stable dedup ID for an event.
Two events with the same source+category+group_key+location
will have the same ID and can be deduplicated.
Args:
source: Adapter name
category: Event category
group_key: Optional grouping key
lat: Optional latitude
lon: Optional longitude
Returns:
16-character hex ID
"""
key_parts = [
source,
category,
group_key or "",
str(lat) if lat is not None else "",
str(lon) if lon is not None else "",
]
key_string = ":".join(key_parts)
return hashlib.sha1(key_string.encode()).hexdigest()[:16]
def to_dict(self) -> dict[str, Any]:
"""Serialize event to a dict for JSON storage/webhook.
Returns:
Dict representation of the event
"""
return asdict(self)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Event":
"""Restore an Event from a dict.
Args:
d: Dict representation (from to_dict or JSON load)
Returns:
Event instance
"""
return cls(**d)
def make_event(
source: str,
category: str,
severity: str,
**kwargs: Any,
) -> Event:
"""Create an Event with automatic ID and timestamp.
This is the primary factory function for creating events.
It auto-computes the ID if not provided and sets timestamp
to the current time if not provided.
Args:
source: Adapter name (e.g., "nws", "firms", "alert_engine")
category: Event category (e.g., "tornado_warning", "infra_offline")
severity: One of "routine", "priority", "immediate"
**kwargs: Additional Event fields
Returns:
Event instance
Raises:
ValueError: If severity is not valid
"""
# Validate severity
if severity not in SEVERITY_LEVELS:
raise ValueError(
f"Invalid severity '{severity}'. "
f"Must be one of: {', '.join(sorted(SEVERITY_LEVELS))}"
)
# Auto-set timestamp if not provided
if "timestamp" not in kwargs or kwargs["timestamp"] == 0.0:
kwargs["timestamp"] = time.time()
# Auto-compute ID if not provided
if "id" not in kwargs or not kwargs["id"]:
kwargs["id"] = Event.compute_id(
source=source,
category=category,
group_key=kwargs.get("group_key"),
lat=kwargs.get("lat"),
lon=kwargs.get("lon"),
)
return Event(
source=source,
category=category,
severity=severity,
**kwargs,
)
"""Event dataclass for the v0.3 notification pipeline.
This module defines the unified Event shape that flows through the
notification routing pipeline. All adapters emit Events, and the
router consumes them.
Usage:
from meshai.notifications.events import Event, make_event
# Create an event
event = make_event(
source="nws",
category="tornado_warning",
severity="immediate",
title="Tornado Warning for Ada County",
summary="A tornado warning has been issued...",
lat=43.615,
lon=-116.2023,
)
# Serialize for storage/webhook
data = event.to_dict()
# Restore from storage
event2 = Event.from_dict(data)
"""
import hashlib
import time
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
# Valid severity levels
SEVERITY_LEVELS = frozenset({"routine", "priority", "immediate"})
@dataclass
class Event:
"""Unified event shape for the notification pipeline.
All adapters (NWS, FIRMS, alert_engine, etc.) emit Events.
The router consumes Events and dispatches them to channels.
"""
# Identity
id: str = "" # stable hash for dedup, computed if not provided
source: str = "" # adapter name: "nws", "firms", "alert_engine", etc.
category: str = "" # specific event type within source
# Severity
severity: str = "routine" # "routine" | "priority" | "immediate"
# Geography
region: Optional[str] = None # primary region name, set by region tagger
regions: list[str] = field(default_factory=list) # all regions touched
lat: Optional[float] = None
lon: Optional[float] = None
nws_zones: list[str] = field(default_factory=list) # NWS zone codes
# Content
title: str = "" # one-line summary for digest headers
summary: str = "" # 1-3 sentence summary for immediate/mesh delivery
body: str = "" # full content for email/webhook delivery
# Affected entities (for mesh health events)
node_ids: list[str] = field(default_factory=list)
short_names: list[str] = field(default_factory=list)
# Timing
timestamp: float = 0.0 # event creation time
effective: Optional[float] = None # event start (NWS-style)
expires: Optional[float] = None # event end (NWS-style)
# Routing hints
group_key: Optional[str] = None # events with same key get merged
inhibit_keys: list[str] = field(default_factory=list) # suppression keys
# Raw adapter data (preserved for advanced rendering)
data: dict = field(default_factory=dict)
@staticmethod
def compute_id(
source: str,
category: str,
group_key: Optional[str] = None,
lat: Optional[float] = None,
lon: Optional[float] = None,
) -> str:
"""Compute a stable dedup ID for an event.
Two events with the same source+category+group_key+location
will have the same ID and can be deduplicated.
Args:
source: Adapter name
category: Event category
group_key: Optional grouping key
lat: Optional latitude
lon: Optional longitude
Returns:
16-character hex ID
"""
key_parts = [
source,
category,
group_key or "",
str(lat) if lat is not None else "",
str(lon) if lon is not None else "",
]
key_string = ":".join(key_parts)
return hashlib.sha1(key_string.encode()).hexdigest()[:16]
def to_dict(self) -> dict[str, Any]:
"""Serialize event to a dict for JSON storage/webhook.
Returns:
Dict representation of the event
"""
return asdict(self)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Event":
"""Restore an Event from a dict.
Args:
d: Dict representation (from to_dict or JSON load)
Returns:
Event instance
"""
return cls(**d)
def make_event(
source: str,
category: str,
severity: str,
**kwargs: Any,
) -> Event:
"""Create an Event with automatic ID and timestamp.
This is the primary factory function for creating events.
It auto-computes the ID if not provided and sets timestamp
to the current time if not provided.
Args:
source: Adapter name (e.g., "nws", "firms", "alert_engine")
category: Event category (e.g., "tornado_warning", "infra_offline")
severity: One of "routine", "priority", "immediate"
**kwargs: Additional Event fields
Returns:
Event instance
Raises:
ValueError: If severity is not valid
"""
# Validate severity
if severity not in SEVERITY_LEVELS:
raise ValueError(
f"Invalid severity '{severity}'. "
f"Must be one of: {', '.join(sorted(SEVERITY_LEVELS))}"
)
# Auto-set timestamp if not provided
if "timestamp" not in kwargs or kwargs["timestamp"] == 0.0:
kwargs["timestamp"] = time.time()
# Auto-compute ID if not provided
if "id" not in kwargs or not kwargs["id"]:
kwargs["id"] = Event.compute_id(
source=source,
category=category,
group_key=kwargs.get("group_key"),
lat=kwargs.get("lat"),
lon=kwargs.get("lon"),
)
return Event(
source=source,
category=category,
severity=severity,
**kwargs,
)

View file

@ -1,154 +1,154 @@
"""Notification pipeline package.
Phase 2.1 + 2.2 + 2.3a + 2.3b:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- DigestAccumulator: tracks priority/routine events for periodic digest
- DigestScheduler: fires digest at configured time (Phase 2.3b)
Usage:
from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline
bus = build_pipeline(config)
bus.emit(event)
# Async lifecycle
scheduler = await start_pipeline(bus, config)
...
await stop_pipeline(scheduler)
"""
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue, # kept for Phase 2.1 backward-compat tests
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.scheduler import DigestScheduler
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Components are stashed on bus._pipeline_components for lifecycle use.
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
include_toggles = None
if digest_cfg is not None:
include_list = getattr(digest_cfg, "include", None)
if include_list:
include_toggles = list(include_list)
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
# Stash components for lifecycle management
bus._pipeline_components = {
"inhibitor": inhibitor,
"grouper": grouper,
"severity_router": severity_router,
"dispatcher": dispatcher,
"digest": digest,
}
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for tests.
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
include_toggles = None
if digest_cfg is not None:
include_list = getattr(digest_cfg, "include", None)
if include_list:
include_toggles = list(include_list)
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, severity_router, dispatcher, digest
async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
"""Start the pipeline's async components (scheduler).
Args:
bus: EventBus returned by build_pipeline()
config: Config object with notifications.digest settings
Returns:
DigestScheduler instance (running). Call stop_pipeline() to stop.
"""
components = getattr(bus, "_pipeline_components", None)
if components is None:
raise RuntimeError("bus missing _pipeline_components; use build_pipeline()")
digest = components["digest"]
scheduler = DigestScheduler(
accumulator=digest,
config=config,
channel_factory=create_channel,
)
await scheduler.start()
# Stash scheduler for stop_pipeline
bus._pipeline_scheduler = scheduler
return scheduler
async def stop_pipeline(scheduler: DigestScheduler) -> None:
"""Stop the pipeline's async components.
Args:
scheduler: DigestScheduler returned by start_pipeline()
"""
if scheduler is not None:
await scheduler.stop()
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",
"DigestAccumulator",
"Digest",
"DigestScheduler",
"build_pipeline",
"build_pipeline_components",
"start_pipeline",
"stop_pipeline",
"get_bus",
]
"""Notification pipeline package.
Phase 2.1 + 2.2 + 2.3a + 2.3b:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- DigestAccumulator: tracks priority/routine events for periodic digest
- DigestScheduler: fires digest at configured time (Phase 2.3b)
Usage:
from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline
bus = build_pipeline(config)
bus.emit(event)
# Async lifecycle
scheduler = await start_pipeline(bus, config)
...
await stop_pipeline(scheduler)
"""
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue, # kept for Phase 2.1 backward-compat tests
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.scheduler import DigestScheduler
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Components are stashed on bus._pipeline_components for lifecycle use.
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
include_toggles = None
if digest_cfg is not None:
include_list = getattr(digest_cfg, "include", None)
if include_list:
include_toggles = list(include_list)
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
# Stash components for lifecycle management
bus._pipeline_components = {
"inhibitor": inhibitor,
"grouper": grouper,
"severity_router": severity_router,
"dispatcher": dispatcher,
"digest": digest,
}
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for tests.
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
include_toggles = None
if digest_cfg is not None:
include_list = getattr(digest_cfg, "include", None)
if include_list:
include_toggles = list(include_list)
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, severity_router, dispatcher, digest
async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
"""Start the pipeline's async components (scheduler).
Args:
bus: EventBus returned by build_pipeline()
config: Config object with notifications.digest settings
Returns:
DigestScheduler instance (running). Call stop_pipeline() to stop.
"""
components = getattr(bus, "_pipeline_components", None)
if components is None:
raise RuntimeError("bus missing _pipeline_components; use build_pipeline()")
digest = components["digest"]
scheduler = DigestScheduler(
accumulator=digest,
config=config,
channel_factory=create_channel,
)
await scheduler.start()
# Stash scheduler for stop_pipeline
bus._pipeline_scheduler = scheduler
return scheduler
async def stop_pipeline(scheduler: DigestScheduler) -> None:
"""Stop the pipeline's async components.
Args:
scheduler: DigestScheduler returned by start_pipeline()
"""
if scheduler is not None:
await scheduler.stop()
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",
"DigestAccumulator",
"Digest",
"DigestScheduler",
"build_pipeline",
"build_pipeline_components",
"start_pipeline",
"stop_pipeline",
"get_bus",
]

View file

@ -1,85 +1,85 @@
"""Event bus for the notification pipeline.
The bus is the entry point for all events flowing through the pipeline.
Adapters call bus.emit(event) to push Events into the system.
Usage:
from meshai.notifications.pipeline import get_bus
from meshai.notifications.events import make_event
bus = get_bus()
event = make_event(source="nws", category="weather_warning", severity="immediate", ...)
bus.emit(event)
"""
import logging
from typing import Callable, Iterable
from meshai.notifications.events import Event
class EventBus:
"""Central event bus for the notification pipeline.
Subscribers register handlers that receive every emitted event.
Errors in one subscriber do not prevent other subscribers from
receiving the event.
"""
def __init__(self):
self._subscribers: list[Callable[[Event], None]] = []
self._logger = logging.getLogger("meshai.pipeline.bus")
def subscribe(self, handler: Callable[[Event], None]) -> None:
"""Register a handler that receives every emitted event.
Args:
handler: Callable that takes an Event and returns None
"""
self._subscribers.append(handler)
self._logger.debug(f"Subscribed handler: {handler}")
def emit(self, event: Event) -> None:
"""Push an event to all subscribers.
Errors in one subscriber do not stop others from receiving
the event. Exceptions are logged but not re-raised.
Args:
event: The Event to deliver to all subscribers
"""
for handler in self._subscribers:
try:
handler(event)
except Exception:
self._logger.exception(
f"Subscriber {handler} failed on event {event.id}"
)
def emit_many(self, events: Iterable[Event]) -> None:
"""Emit multiple events in sequence.
Args:
events: Iterable of Events to emit
"""
for event in events:
self.emit(event)
# Module-level singleton for application-wide use
_bus: EventBus | None = None
def get_bus() -> EventBus:
"""Get the global EventBus singleton.
This is the primary way adapters access the bus. Tests should
construct a fresh EventBus() directly to avoid shared state.
Returns:
The global EventBus instance
"""
global _bus
if _bus is None:
_bus = EventBus()
return _bus
"""Event bus for the notification pipeline.
The bus is the entry point for all events flowing through the pipeline.
Adapters call bus.emit(event) to push Events into the system.
Usage:
from meshai.notifications.pipeline import get_bus
from meshai.notifications.events import make_event
bus = get_bus()
event = make_event(source="nws", category="weather_warning", severity="immediate", ...)
bus.emit(event)
"""
import logging
from typing import Callable, Iterable
from meshai.notifications.events import Event
class EventBus:
"""Central event bus for the notification pipeline.
Subscribers register handlers that receive every emitted event.
Errors in one subscriber do not prevent other subscribers from
receiving the event.
"""
def __init__(self):
self._subscribers: list[Callable[[Event], None]] = []
self._logger = logging.getLogger("meshai.pipeline.bus")
def subscribe(self, handler: Callable[[Event], None]) -> None:
"""Register a handler that receives every emitted event.
Args:
handler: Callable that takes an Event and returns None
"""
self._subscribers.append(handler)
self._logger.debug(f"Subscribed handler: {handler}")
def emit(self, event: Event) -> None:
"""Push an event to all subscribers.
Errors in one subscriber do not stop others from receiving
the event. Exceptions are logged but not re-raised.
Args:
event: The Event to deliver to all subscribers
"""
for handler in self._subscribers:
try:
handler(event)
except Exception:
self._logger.exception(
f"Subscriber {handler} failed on event {event.id}"
)
def emit_many(self, events: Iterable[Event]) -> None:
"""Emit multiple events in sequence.
Args:
events: Iterable of Events to emit
"""
for event in events:
self.emit(event)
# Module-level singleton for application-wide use
_bus: EventBus | None = None
def get_bus() -> EventBus:
"""Get the global EventBus singleton.
This is the primary way adapters access the bus. Tests should
construct a fresh EventBus() directly to avoid shared state.
Returns:
The global EventBus instance
"""
global _bus
if _bus is None:
_bus = EventBus()
return _bus

View file

@ -1,458 +1,458 @@
"""Digest accumulator and renderer for Phase 2.3a.
Holds priority and routine events between digest emissions, tracks
active vs recently-resolved events, and renders the two-section
digest output (ACTIVE NOW + SINCE LAST DIGEST) when called.
No scheduling logic here. render_digest() is called explicitly by
the future scheduler (Phase 2.3b) or by tests.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
# Lowercase substrings in event.title that indicate the event is
# a resolution of a prior alert. Conservative list — easy to extend.
RESOLUTION_MARKERS = (
"cleared",
"reopened",
"ended",
"resolved",
"back online",
"recovered",
"lifted",
)
# Display labels per toggle (used in rendered output)
TOGGLE_LABELS = {
"mesh_health": "Mesh",
"weather": "Weather",
"fire": "Fire",
"rf_propagation": "RF",
"roads": "Roads",
"avalanche": "Avalanche",
"seismic": "Seismic",
"tracking": "Tracking",
"other": "Other",
}
# Toggle sort order in digest output (most operationally urgent first)
TOGGLE_ORDER = [
"weather",
"fire",
"seismic",
"avalanche",
"roads",
"rf_propagation",
"mesh_health",
"tracking",
"other",
]
@dataclass
class Digest:
"""Result of render_digest(). Carries both sections and metadata."""
rendered_at: float
active: dict[str, list[Event]] = field(default_factory=dict)
since_last: dict[str, list[Event]] = field(default_factory=dict)
mesh_chunks: list[str] = field(default_factory=list)
mesh_compact: str = ""
full: str = ""
def is_empty(self) -> bool:
return not self.active and not self.since_last
class DigestAccumulator:
"""Tracks priority/routine events and produces periodic digests.
Args:
mesh_char_limit: Maximum characters per mesh chunk (default 200).
include_toggles: List of toggle names to include in digest output.
If None, defaults to all toggles in TOGGLE_ORDER except
rf_propagation. Unknown toggle names in the list are silently
accepted (TOGGLE_ORDER drives display order, include_toggles
drives which toggles are tracked).
"""
def __init__(
self,
mesh_char_limit: int = 200,
include_toggles: list[str] | None = None,
):
self._active: dict[str, list[Event]] = {} # toggle -> events
self._since_last: dict[str, list[Event]] = {} # toggle -> events
self._last_digest_at: float = 0.0
self._mesh_char_limit = mesh_char_limit
# Default: all known toggles except rf_propagation
if include_toggles is None:
self._included = set(TOGGLE_ORDER) - {"rf_propagation"}
else:
self._included = set(include_toggles)
self._logger = logging.getLogger("meshai.pipeline.digest")
# ---- ingress ----
def enqueue(self, event: Event) -> None:
"""SeverityRouter calls this for priority/routine events."""
toggle = get_toggle(event.category) or "other"
# Skip non-included toggles
if toggle not in self._included:
self._logger.debug(
f"skipping digest enqueue for non-included toggle {toggle}"
)
return
active_for_toggle = self._active.setdefault(toggle, [])
# Resolution detection
if self._is_resolution(event, self._now()):
self._move_to_since_last_by_group(event, toggle)
return
# In-place update if same id
for i, existing in enumerate(active_for_toggle):
if existing.id == event.id:
active_for_toggle[i] = event
self._logger.debug(
f"UPDATED active event {event.id} in {toggle}"
)
return
# Otherwise it's a new active event
active_for_toggle.append(event)
self._logger.debug(
f"ADDED active event {event.id} ({toggle}/{event.category})"
)
def tick(self, now: Optional[float] = None) -> int:
"""Move expired events from active to since_last.
Returns the number of events moved.
"""
if now is None:
now = self._now()
moved = 0
for toggle in list(self._active.keys()):
still_active = []
for ev in self._active[toggle]:
if ev.expires is not None and ev.expires <= now:
self._since_last.setdefault(toggle, []).append(ev)
moved += 1
else:
still_active.append(ev)
self._active[toggle] = still_active
return moved
# ---- rendering ----
def render_digest(self, now: Optional[float] = None) -> Digest:
"""Produce a Digest of current state, then clear since_last."""
if now is None:
now = self._now()
# tick() first so expired actives roll into since_last
self.tick(now)
digest = Digest(rendered_at=now)
# Defensive: skip non-included toggles when building output
digest.active = {
k: list(v) for k, v in self._active.items()
if v and k in self._included
}
digest.since_last = {
k: list(v) for k, v in self._since_last.items()
if v and k in self._included
}
digest.mesh_chunks = self._render_mesh_chunks(digest, now)
# mesh_compact: join chunks for backward compatibility
if len(digest.mesh_chunks) == 1:
digest.mesh_compact = digest.mesh_chunks[0]
else:
digest.mesh_compact = "\n---\n".join(digest.mesh_chunks)
digest.full = self._render_full(digest, now)
# Clear since_last; active stays for the next cycle
self._since_last.clear()
self._last_digest_at = now
return digest
def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]:
"""Produce mesh-radio-friendly compact chunks.
Returns a list of strings, each self._mesh_char_limit chars.
Single-chunk output has no "(1/N)" suffix. Multi-chunk output
has "(k/N)" counters and "(cont)" suffixes on section headers
that span chunks.
"""
time_str = time.strftime('%H%M', time.localtime(now))
# Empty digest case
if not digest.active and not digest.since_last:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
# Build logical lines with section markers
# Each item is (section, line) where section is "active", "resolved", or None
logical_lines: list[tuple[str | None, str]] = []
if digest.active:
logical_lines.append(("active", "ACTIVE NOW"))
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
logical_lines.append(("active", self._compact_toggle_line(toggle, events)))
if digest.since_last:
logical_lines.append(("resolved", "RESOLVED"))
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
logical_lines.append(("resolved", self._compact_toggle_line(toggle, events)))
# Pack lines into chunks
return self._pack_lines_into_chunks(logical_lines, time_str)
def _pack_lines_into_chunks(
self,
logical_lines: list[tuple[str | None, str]],
time_str: str,
) -> list[str]:
"""Pack logical lines into chunks respecting char limit.
Args:
logical_lines: List of (section, line) tuples where section
is "active", "resolved", or None for headers.
time_str: Time string for headers (e.g., "0700").
Returns:
List of chunk strings, each self._mesh_char_limit.
"""
if not logical_lines:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
limit = self._mesh_char_limit
chunks: list[list[str]] = [] # List of line lists
current_chunk: list[str] = []
current_len = 0
last_section_in_chunk: str | None = None
sections_started: set[str] = set()
# Placeholder header - will be fixed up later
header_placeholder = f"DIGEST {time_str}"
def start_new_chunk():
nonlocal current_chunk, current_len, last_section_in_chunk
if current_chunk:
chunks.append(current_chunk)
current_chunk = [header_placeholder]
current_len = len(header_placeholder)
last_section_in_chunk = None
start_new_chunk()
i = 0
while i < len(logical_lines):
section, line = logical_lines[i]
is_section_header = line in ("ACTIVE NOW", "RESOLVED")
# Check if this is a section header - ensure it has at least one
# toggle line following it in this chunk
if is_section_header:
# Look ahead for the next toggle line
next_toggle_idx = i + 1
if next_toggle_idx < len(logical_lines):
_, next_line = logical_lines[next_toggle_idx]
# Calculate space needed for header + newline + next line
needed = len(line) + 1 + len(next_line)
if current_len + 1 + needed > limit:
# Section header + next line won't fit, start new chunk
start_new_chunk()
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += 1 + len(line)
i += 1
continue
# Calculate line length with newline
line_with_newline = 1 + len(line) # newline before line
# Would this line fit?
if current_len + line_with_newline > limit:
# Start new chunk
start_new_chunk()
# If continuing a section, add "(cont)" header
if section and section in sections_started and not is_section_header:
cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)"
current_chunk.append(cont_header)
current_len += 1 + len(cont_header)
last_section_in_chunk = section
# Add the line
if is_section_header:
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += 1 + len(line)
i += 1
# Don't forget the last chunk
if current_chunk and len(current_chunk) > 1: # More than just header
chunks.append(current_chunk)
elif current_chunk and len(current_chunk) == 1:
# Only header in chunk - shouldn't happen but handle gracefully
if chunks:
# Merge with previous chunk if possible
pass
else:
chunks.append(current_chunk)
# Fix up headers with chunk counts
total_chunks = len(chunks)
result: list[str] = []
for idx, chunk_lines in enumerate(chunks):
# Fix header line
if total_chunks == 1:
chunk_lines[0] = f"DIGEST {time_str}"
else:
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})"
result.append("\n".join(chunk_lines))
return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."]
def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str:
"""Build one compact line for a toggle: [Label] headline (+N)"""
label = TOGGLE_LABELS.get(toggle, toggle)
sorted_events = self._sort_events(events)
top_event = sorted_events[0]
# Get headline text
headline = top_event.summary or top_event.title or top_event.category
# Truncate headline at ~60 chars to keep lines readable
max_headline = 60
if len(headline) > max_headline:
headline = headline[:max_headline - 1] + ""
# Append (+N) if more than one event
overflow = len(events) - 1
if overflow > 0:
return f"[{label}] {headline} (+{overflow})"
else:
return f"[{label}] {headline}"
def _render_full(self, digest: Digest, now: float) -> str:
"""Produce the full multi-line digest for email/webhook."""
lines = [
f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---",
"",
]
if not digest.active and not digest.since_last:
lines.append("No alerts since last digest.")
lines.append("")
else:
if digest.active:
lines.append("ACTIVE NOW:")
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
if digest.since_last:
lines.append("SINCE LAST DIGEST:")
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_event_line(self, event: Event) -> str:
"""Single-line summary of an event for digest output."""
# Prefer event.summary if set, else fall back to title, then category
text = event.summary or event.title or event.category
# Trim runaway text — keep digest readable
if len(text) > 140:
text = text[:139] + ""
return text
def _sort_events(self, events: list[Event]) -> list[Event]:
"""Sort within a toggle: immediate first, then priority,
then routine, then by timestamp newest first."""
rank = {"immediate": 0, "priority": 1, "routine": 2}
return sorted(
events,
key=lambda e: (rank.get(e.severity, 3), -e.timestamp),
)
# ---- helpers ----
def _is_resolution(self, event: Event, now: float) -> bool:
if event.expires is not None and event.expires <= now:
return True
title_lc = (event.title or "").lower()
return any(marker in title_lc for marker in RESOLUTION_MARKERS)
def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None:
"""Remove any active event matching event's group_key (or id)
and place this resolution event into since_last.
"""
active_list = self._active.get(toggle, [])
# Match by group_key if set, else by id
match_key = event.group_key
if match_key:
self._active[toggle] = [
e for e in active_list
if e.group_key != match_key
]
else:
self._active[toggle] = [
e for e in active_list if e.id != event.id
]
self._since_last.setdefault(toggle, []).append(event)
self._logger.debug(
f"RESOLVED in {toggle}: {event.id} ({event.title!r})"
)
def _now(self) -> float:
return time.time()
# ---- inspection (for tests and future scheduler) ----
def active_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._active.get(toggle, []))
return sum(len(v) for v in self._active.values())
def since_last_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._since_last.get(toggle, []))
return sum(len(v) for v in self._since_last.values())
def last_digest_at(self) -> float:
return self._last_digest_at
def clear(self) -> None:
self._active.clear()
self._since_last.clear()
self._last_digest_at = 0.0
"""Digest accumulator and renderer for Phase 2.3a.
Holds priority and routine events between digest emissions, tracks
active vs recently-resolved events, and renders the two-section
digest output (ACTIVE NOW + SINCE LAST DIGEST) when called.
No scheduling logic here. render_digest() is called explicitly by
the future scheduler (Phase 2.3b) or by tests.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
# Lowercase substrings in event.title that indicate the event is
# a resolution of a prior alert. Conservative list — easy to extend.
RESOLUTION_MARKERS = (
"cleared",
"reopened",
"ended",
"resolved",
"back online",
"recovered",
"lifted",
)
# Display labels per toggle (used in rendered output)
TOGGLE_LABELS = {
"mesh_health": "Mesh",
"weather": "Weather",
"fire": "Fire",
"rf_propagation": "RF",
"roads": "Roads",
"avalanche": "Avalanche",
"seismic": "Seismic",
"tracking": "Tracking",
"other": "Other",
}
# Toggle sort order in digest output (most operationally urgent first)
TOGGLE_ORDER = [
"weather",
"fire",
"seismic",
"avalanche",
"roads",
"rf_propagation",
"mesh_health",
"tracking",
"other",
]
@dataclass
class Digest:
"""Result of render_digest(). Carries both sections and metadata."""
rendered_at: float
active: dict[str, list[Event]] = field(default_factory=dict)
since_last: dict[str, list[Event]] = field(default_factory=dict)
mesh_chunks: list[str] = field(default_factory=list)
mesh_compact: str = ""
full: str = ""
def is_empty(self) -> bool:
return not self.active and not self.since_last
class DigestAccumulator:
"""Tracks priority/routine events and produces periodic digests.
Args:
mesh_char_limit: Maximum characters per mesh chunk (default 200).
include_toggles: List of toggle names to include in digest output.
If None, defaults to all toggles in TOGGLE_ORDER except
rf_propagation. Unknown toggle names in the list are silently
accepted (TOGGLE_ORDER drives display order, include_toggles
drives which toggles are tracked).
"""
def __init__(
self,
mesh_char_limit: int = 200,
include_toggles: list[str] | None = None,
):
self._active: dict[str, list[Event]] = {} # toggle -> events
self._since_last: dict[str, list[Event]] = {} # toggle -> events
self._last_digest_at: float = 0.0
self._mesh_char_limit = mesh_char_limit
# Default: all known toggles except rf_propagation
if include_toggles is None:
self._included = set(TOGGLE_ORDER) - {"rf_propagation"}
else:
self._included = set(include_toggles)
self._logger = logging.getLogger("meshai.pipeline.digest")
# ---- ingress ----
def enqueue(self, event: Event) -> None:
"""SeverityRouter calls this for priority/routine events."""
toggle = get_toggle(event.category) or "other"
# Skip non-included toggles
if toggle not in self._included:
self._logger.debug(
f"skipping digest enqueue for non-included toggle {toggle}"
)
return
active_for_toggle = self._active.setdefault(toggle, [])
# Resolution detection
if self._is_resolution(event, self._now()):
self._move_to_since_last_by_group(event, toggle)
return
# In-place update if same id
for i, existing in enumerate(active_for_toggle):
if existing.id == event.id:
active_for_toggle[i] = event
self._logger.debug(
f"UPDATED active event {event.id} in {toggle}"
)
return
# Otherwise it's a new active event
active_for_toggle.append(event)
self._logger.debug(
f"ADDED active event {event.id} ({toggle}/{event.category})"
)
def tick(self, now: Optional[float] = None) -> int:
"""Move expired events from active to since_last.
Returns the number of events moved.
"""
if now is None:
now = self._now()
moved = 0
for toggle in list(self._active.keys()):
still_active = []
for ev in self._active[toggle]:
if ev.expires is not None and ev.expires <= now:
self._since_last.setdefault(toggle, []).append(ev)
moved += 1
else:
still_active.append(ev)
self._active[toggle] = still_active
return moved
# ---- rendering ----
def render_digest(self, now: Optional[float] = None) -> Digest:
"""Produce a Digest of current state, then clear since_last."""
if now is None:
now = self._now()
# tick() first so expired actives roll into since_last
self.tick(now)
digest = Digest(rendered_at=now)
# Defensive: skip non-included toggles when building output
digest.active = {
k: list(v) for k, v in self._active.items()
if v and k in self._included
}
digest.since_last = {
k: list(v) for k, v in self._since_last.items()
if v and k in self._included
}
digest.mesh_chunks = self._render_mesh_chunks(digest, now)
# mesh_compact: join chunks for backward compatibility
if len(digest.mesh_chunks) == 1:
digest.mesh_compact = digest.mesh_chunks[0]
else:
digest.mesh_compact = "\n---\n".join(digest.mesh_chunks)
digest.full = self._render_full(digest, now)
# Clear since_last; active stays for the next cycle
self._since_last.clear()
self._last_digest_at = now
return digest
def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]:
"""Produce mesh-radio-friendly compact chunks.
Returns a list of strings, each self._mesh_char_limit chars.
Single-chunk output has no "(1/N)" suffix. Multi-chunk output
has "(k/N)" counters and "(cont)" suffixes on section headers
that span chunks.
"""
time_str = time.strftime('%H%M', time.localtime(now))
# Empty digest case
if not digest.active and not digest.since_last:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
# Build logical lines with section markers
# Each item is (section, line) where section is "active", "resolved", or None
logical_lines: list[tuple[str | None, str]] = []
if digest.active:
logical_lines.append(("active", "ACTIVE NOW"))
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
logical_lines.append(("active", self._compact_toggle_line(toggle, events)))
if digest.since_last:
logical_lines.append(("resolved", "RESOLVED"))
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
logical_lines.append(("resolved", self._compact_toggle_line(toggle, events)))
# Pack lines into chunks
return self._pack_lines_into_chunks(logical_lines, time_str)
def _pack_lines_into_chunks(
self,
logical_lines: list[tuple[str | None, str]],
time_str: str,
) -> list[str]:
"""Pack logical lines into chunks respecting char limit.
Args:
logical_lines: List of (section, line) tuples where section
is "active", "resolved", or None for headers.
time_str: Time string for headers (e.g., "0700").
Returns:
List of chunk strings, each self._mesh_char_limit.
"""
if not logical_lines:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
limit = self._mesh_char_limit
chunks: list[list[str]] = [] # List of line lists
current_chunk: list[str] = []
current_len = 0
last_section_in_chunk: str | None = None
sections_started: set[str] = set()
# Placeholder header - will be fixed up later
header_placeholder = f"DIGEST {time_str}"
def start_new_chunk():
nonlocal current_chunk, current_len, last_section_in_chunk
if current_chunk:
chunks.append(current_chunk)
current_chunk = [header_placeholder]
current_len = len(header_placeholder)
last_section_in_chunk = None
start_new_chunk()
i = 0
while i < len(logical_lines):
section, line = logical_lines[i]
is_section_header = line in ("ACTIVE NOW", "RESOLVED")
# Check if this is a section header - ensure it has at least one
# toggle line following it in this chunk
if is_section_header:
# Look ahead for the next toggle line
next_toggle_idx = i + 1
if next_toggle_idx < len(logical_lines):
_, next_line = logical_lines[next_toggle_idx]
# Calculate space needed for header + newline + next line
needed = len(line) + 1 + len(next_line)
if current_len + 1 + needed > limit:
# Section header + next line won't fit, start new chunk
start_new_chunk()
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += 1 + len(line)
i += 1
continue
# Calculate line length with newline
line_with_newline = 1 + len(line) # newline before line
# Would this line fit?
if current_len + line_with_newline > limit:
# Start new chunk
start_new_chunk()
# If continuing a section, add "(cont)" header
if section and section in sections_started and not is_section_header:
cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)"
current_chunk.append(cont_header)
current_len += 1 + len(cont_header)
last_section_in_chunk = section
# Add the line
if is_section_header:
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += 1 + len(line)
i += 1
# Don't forget the last chunk
if current_chunk and len(current_chunk) > 1: # More than just header
chunks.append(current_chunk)
elif current_chunk and len(current_chunk) == 1:
# Only header in chunk - shouldn't happen but handle gracefully
if chunks:
# Merge with previous chunk if possible
pass
else:
chunks.append(current_chunk)
# Fix up headers with chunk counts
total_chunks = len(chunks)
result: list[str] = []
for idx, chunk_lines in enumerate(chunks):
# Fix header line
if total_chunks == 1:
chunk_lines[0] = f"DIGEST {time_str}"
else:
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})"
result.append("\n".join(chunk_lines))
return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."]
def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str:
"""Build one compact line for a toggle: [Label] headline (+N)"""
label = TOGGLE_LABELS.get(toggle, toggle)
sorted_events = self._sort_events(events)
top_event = sorted_events[0]
# Get headline text
headline = top_event.summary or top_event.title or top_event.category
# Truncate headline at ~60 chars to keep lines readable
max_headline = 60
if len(headline) > max_headline:
headline = headline[:max_headline - 1] + ""
# Append (+N) if more than one event
overflow = len(events) - 1
if overflow > 0:
return f"[{label}] {headline} (+{overflow})"
else:
return f"[{label}] {headline}"
def _render_full(self, digest: Digest, now: float) -> str:
"""Produce the full multi-line digest for email/webhook."""
lines = [
f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---",
"",
]
if not digest.active and not digest.since_last:
lines.append("No alerts since last digest.")
lines.append("")
else:
if digest.active:
lines.append("ACTIVE NOW:")
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
if digest.since_last:
lines.append("SINCE LAST DIGEST:")
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_event_line(self, event: Event) -> str:
"""Single-line summary of an event for digest output."""
# Prefer event.summary if set, else fall back to title, then category
text = event.summary or event.title or event.category
# Trim runaway text — keep digest readable
if len(text) > 140:
text = text[:139] + ""
return text
def _sort_events(self, events: list[Event]) -> list[Event]:
"""Sort within a toggle: immediate first, then priority,
then routine, then by timestamp newest first."""
rank = {"immediate": 0, "priority": 1, "routine": 2}
return sorted(
events,
key=lambda e: (rank.get(e.severity, 3), -e.timestamp),
)
# ---- helpers ----
def _is_resolution(self, event: Event, now: float) -> bool:
if event.expires is not None and event.expires <= now:
return True
title_lc = (event.title or "").lower()
return any(marker in title_lc for marker in RESOLUTION_MARKERS)
def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None:
"""Remove any active event matching event's group_key (or id)
and place this resolution event into since_last.
"""
active_list = self._active.get(toggle, [])
# Match by group_key if set, else by id
match_key = event.group_key
if match_key:
self._active[toggle] = [
e for e in active_list
if e.group_key != match_key
]
else:
self._active[toggle] = [
e for e in active_list if e.id != event.id
]
self._since_last.setdefault(toggle, []).append(event)
self._logger.debug(
f"RESOLVED in {toggle}: {event.id} ({event.title!r})"
)
def _now(self) -> float:
return time.time()
# ---- inspection (for tests and future scheduler) ----
def active_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._active.get(toggle, []))
return sum(len(v) for v in self._active.values())
def since_last_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._since_last.get(toggle, []))
return sum(len(v) for v in self._since_last.values())
def last_digest_at(self) -> float:
return self._last_digest_at
def clear(self) -> None:
self._active.clear()
self._since_last.clear()
self._last_digest_at = 0.0

View file

@ -1,213 +1,213 @@
"""Digest scheduler — fires the digest at a configured time of day.
Reads schedule and channel routing from config; calls
accumulator.render_digest() at the scheduled time; delivers the
result to all rules matching trigger_type=='schedule' and
schedule_match=='digest'.
"""
import asyncio
import logging
import time
from datetime import datetime, timedelta
from typing import Callable, Optional
from meshai.notifications.pipeline.digest import DigestAccumulator
class DigestScheduler:
"""Fires digest at configured time and routes to matching channels."""
def __init__(
self,
accumulator: DigestAccumulator,
config,
channel_factory: Callable,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
):
self._accumulator = accumulator
self._config = config
self._channel_factory = channel_factory
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
self._stop_event: Optional[asyncio.Event] = None
self._last_fire_at: float = 0.0
self._logger = logging.getLogger("meshai.pipeline.scheduler")
async def start(self) -> None:
"""Begin the scheduler loop as an asyncio task."""
if self._task is not None and not self._task.done():
raise RuntimeError("Scheduler already running")
self._stop_event = asyncio.Event()
self._task = asyncio.create_task(self._run(), name="digest-scheduler")
self._logger.info(
f"Digest scheduler started, schedule={self._schedule_str()!r}"
)
async def stop(self) -> None:
"""Signal stop and wait for the task to finish."""
if self._task is None:
return
if self._stop_event:
self._stop_event.set()
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception):
# Cancellation is expected; other exceptions already logged
pass
self._task = None
self._logger.info("Digest scheduler stopped")
async def _run(self) -> None:
"""Main loop: sleep until next fire, fire, repeat."""
try:
while self._stop_event and not self._stop_event.is_set():
now = self._clock()
next_fire = self._next_fire_at(now)
delay = max(0.0, next_fire - now)
self._logger.info(
f"Next digest at {datetime.fromtimestamp(next_fire):%Y-%m-%d %H:%M}, "
f"sleeping {delay:.0f}s"
)
# Interruptible sleep — wakes early if stop() is called
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=delay,
)
# If we got here without timeout, stop was requested
return
except asyncio.TimeoutError:
pass # Timeout fired = digest time arrived
if self._stop_event.is_set():
return
try:
await self._fire(self._clock())
except Exception:
self._logger.exception("Digest fire failed; will retry next cycle")
except asyncio.CancelledError:
raise
except Exception:
self._logger.exception("Scheduler loop crashed unexpectedly")
raise
async def _fire(self, now: float) -> None:
"""Render and deliver one digest."""
self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}")
digest = self._accumulator.render_digest(now)
self._last_fire_at = now
rules = self._matching_rules()
if not rules:
self._logger.warning(
"No digest delivery rules configured (need rules with "
"trigger_type=='schedule' and schedule_match=='digest')"
)
return
for rule in rules:
try:
await self._deliver_to_rule(rule, digest, now)
except Exception:
self._logger.exception(
f"Digest delivery failed for rule {rule.name!r}"
)
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
"""Hand the rendered digest to a channel based on rule.delivery_type."""
channel = self._channel_factory(rule)
delivery_type = rule.delivery_type
if delivery_type in ("mesh_broadcast", "mesh_dm"):
# One deliver call per chunk
chunks = digest.mesh_chunks
total = len(chunks)
for i, chunk in enumerate(chunks, start=1):
payload = {
"category": "digest",
"severity": "routine",
"message": chunk,
"node_id": None,
"region": None,
"timestamp": now,
"chunk_index": i,
"chunk_total": total,
}
channel.deliver(payload)
self._logger.info(
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
)
else:
# Single full-form delivery
payload = {
"category": "digest",
"severity": "routine",
"message": digest.full,
"node_id": None,
"region": None,
"timestamp": now,
}
channel.deliver(payload)
self._logger.info(
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
)
def _matching_rules(self) -> list:
"""Find enabled schedule rules tagged as digest deliveries."""
matches = []
for rule in self._config.notifications.rules:
if not rule.enabled:
continue
if rule.trigger_type != "schedule":
continue
# schedule_match is the discriminator. Operators set it to
# "digest" to receive the morning digest. Other values
# reserved for future schedule types.
schedule_match = getattr(rule, "schedule_match", None)
if schedule_match != "digest":
continue
matches.append(rule)
return matches
def _next_fire_at(self, now: float) -> float:
"""Compute the next epoch timestamp when the digest should fire.
Reads schedule HH:MM from config. If today's fire time has
already passed, returns tomorrow's. Uses local timezone.
"""
schedule_str = self._schedule_str()
h, m = self._parse_schedule(schedule_str)
now_dt = datetime.fromtimestamp(now)
target_today = now_dt.replace(hour=h, minute=m, second=0, microsecond=0)
if target_today.timestamp() <= now:
target = target_today + timedelta(days=1)
else:
target = target_today
return target.timestamp()
def _schedule_str(self) -> str:
digest_cfg = getattr(self._config.notifications, "digest", None)
if digest_cfg is None:
return "07:00"
return getattr(digest_cfg, "schedule", "07:00")
@staticmethod
def _parse_schedule(s: str) -> tuple[int, int]:
"""Parse 'HH:MM' to (hour, minute). Falls back to 07:00 on bad input."""
try:
hh, mm = s.strip().split(":", 1)
h = int(hh)
m = int(mm)
if not (0 <= h <= 23 and 0 <= m <= 59):
raise ValueError(f"out of range: {s}")
return h, m
except (ValueError, AttributeError):
# Fall back to 07:00 rather than crash the loop
return 7, 0
def last_fire_at(self) -> float:
return self._last_fire_at
"""Digest scheduler — fires the digest at a configured time of day.
Reads schedule and channel routing from config; calls
accumulator.render_digest() at the scheduled time; delivers the
result to all rules matching trigger_type=='schedule' and
schedule_match=='digest'.
"""
import asyncio
import logging
import time
from datetime import datetime, timedelta
from typing import Callable, Optional
from meshai.notifications.pipeline.digest import DigestAccumulator
class DigestScheduler:
"""Fires digest at configured time and routes to matching channels."""
def __init__(
self,
accumulator: DigestAccumulator,
config,
channel_factory: Callable,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
):
self._accumulator = accumulator
self._config = config
self._channel_factory = channel_factory
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
self._stop_event: Optional[asyncio.Event] = None
self._last_fire_at: float = 0.0
self._logger = logging.getLogger("meshai.pipeline.scheduler")
async def start(self) -> None:
"""Begin the scheduler loop as an asyncio task."""
if self._task is not None and not self._task.done():
raise RuntimeError("Scheduler already running")
self._stop_event = asyncio.Event()
self._task = asyncio.create_task(self._run(), name="digest-scheduler")
self._logger.info(
f"Digest scheduler started, schedule={self._schedule_str()!r}"
)
async def stop(self) -> None:
"""Signal stop and wait for the task to finish."""
if self._task is None:
return
if self._stop_event:
self._stop_event.set()
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception):
# Cancellation is expected; other exceptions already logged
pass
self._task = None
self._logger.info("Digest scheduler stopped")
async def _run(self) -> None:
"""Main loop: sleep until next fire, fire, repeat."""
try:
while self._stop_event and not self._stop_event.is_set():
now = self._clock()
next_fire = self._next_fire_at(now)
delay = max(0.0, next_fire - now)
self._logger.info(
f"Next digest at {datetime.fromtimestamp(next_fire):%Y-%m-%d %H:%M}, "
f"sleeping {delay:.0f}s"
)
# Interruptible sleep — wakes early if stop() is called
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=delay,
)
# If we got here without timeout, stop was requested
return
except asyncio.TimeoutError:
pass # Timeout fired = digest time arrived
if self._stop_event.is_set():
return
try:
await self._fire(self._clock())
except Exception:
self._logger.exception("Digest fire failed; will retry next cycle")
except asyncio.CancelledError:
raise
except Exception:
self._logger.exception("Scheduler loop crashed unexpectedly")
raise
async def _fire(self, now: float) -> None:
"""Render and deliver one digest."""
self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}")
digest = self._accumulator.render_digest(now)
self._last_fire_at = now
rules = self._matching_rules()
if not rules:
self._logger.warning(
"No digest delivery rules configured (need rules with "
"trigger_type=='schedule' and schedule_match=='digest')"
)
return
for rule in rules:
try:
await self._deliver_to_rule(rule, digest, now)
except Exception:
self._logger.exception(
f"Digest delivery failed for rule {rule.name!r}"
)
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
"""Hand the rendered digest to a channel based on rule.delivery_type."""
channel = self._channel_factory(rule)
delivery_type = rule.delivery_type
if delivery_type in ("mesh_broadcast", "mesh_dm"):
# One deliver call per chunk
chunks = digest.mesh_chunks
total = len(chunks)
for i, chunk in enumerate(chunks, start=1):
payload = {
"category": "digest",
"severity": "routine",
"message": chunk,
"node_id": None,
"region": None,
"timestamp": now,
"chunk_index": i,
"chunk_total": total,
}
channel.deliver(payload)
self._logger.info(
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
)
else:
# Single full-form delivery
payload = {
"category": "digest",
"severity": "routine",
"message": digest.full,
"node_id": None,
"region": None,
"timestamp": now,
}
channel.deliver(payload)
self._logger.info(
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
)
def _matching_rules(self) -> list:
"""Find enabled schedule rules tagged as digest deliveries."""
matches = []
for rule in self._config.notifications.rules:
if not rule.enabled:
continue
if rule.trigger_type != "schedule":
continue
# schedule_match is the discriminator. Operators set it to
# "digest" to receive the morning digest. Other values
# reserved for future schedule types.
schedule_match = getattr(rule, "schedule_match", None)
if schedule_match != "digest":
continue
matches.append(rule)
return matches
def _next_fire_at(self, now: float) -> float:
"""Compute the next epoch timestamp when the digest should fire.
Reads schedule HH:MM from config. If today's fire time has
already passed, returns tomorrow's. Uses local timezone.
"""
schedule_str = self._schedule_str()
h, m = self._parse_schedule(schedule_str)
now_dt = datetime.fromtimestamp(now)
target_today = now_dt.replace(hour=h, minute=m, second=0, microsecond=0)
if target_today.timestamp() <= now:
target = target_today + timedelta(days=1)
else:
target = target_today
return target.timestamp()
def _schedule_str(self) -> str:
digest_cfg = getattr(self._config.notifications, "digest", None)
if digest_cfg is None:
return "07:00"
return getattr(digest_cfg, "schedule", "07:00")
@staticmethod
def _parse_schedule(s: str) -> tuple[int, int]:
"""Parse 'HH:MM' to (hour, minute). Falls back to 07:00 on bad input."""
try:
hh, mm = s.strip().split(":", 1)
h = int(hh)
m = int(mm)
if not (0 <= h <= 23 and 0 <= m <= 59):
raise ValueError(f"out of range: {s}")
return h, m
except (ValueError, AttributeError):
# Fall back to 07:00 rather than crash the loop
return 7, 0
def last_fire_at(self) -> float:
return self._last_fire_at

View file

@ -1,104 +1,104 @@
"""Severity-based event routing.
The severity router subscribes to the bus and forks each event into
one of two paths based on severity:
- immediate immediate_handler (dispatcher for live delivery)
- priority/routine digest_handler (queue for batched summaries)
Usage:
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
bus.subscribe(router.handle)
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class SeverityRouter:
"""Routes events to immediate or digest handlers based on severity.
Immediate-severity events go directly to live delivery channels.
Priority and routine events are queued for periodic digest summaries.
"""
def __init__(
self,
immediate_handler: Callable[[Event], None],
digest_handler: Callable[[Event], None],
):
"""Initialize the severity router.
Args:
immediate_handler: Called for severity="immediate" events
digest_handler: Called for severity in ("priority", "routine")
"""
self._immediate = immediate_handler
self._digest = digest_handler
self._logger = logging.getLogger("meshai.pipeline.severity_router")
def handle(self, event: Event) -> None:
"""Route an event based on its severity.
Args:
event: The Event to route
"""
if event.severity == "immediate":
self._logger.info(
f"IMMEDIATE: {event.source}/{event.category} {event.title}"
)
self._immediate(event)
elif event.severity in ("priority", "routine"):
self._logger.info(
f"DIGEST QUEUED [{event.severity}]: {event.title}"
)
self._digest(event)
else:
self._logger.warning(
f"Unknown severity {event.severity!r} on event {event.id}, dropping"
)
class StubDigestQueue:
"""Placeholder digest queue for Phase 2.1.
This is a stub that simply collects events in memory. Phase 2.3
will replace this with the real aggregator that renders and
delivers periodic digest summaries.
"""
def __init__(self):
self._queue: list[Event] = []
self._logger = logging.getLogger("meshai.pipeline.digest_stub")
def enqueue(self, event: Event) -> None:
"""Add an event to the digest queue.
Args:
event: The Event to queue for digest delivery
"""
self._queue.append(event)
toggle = get_toggle(event.category) or "unknown"
self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}")
def drain(self) -> list[Event]:
"""Return and clear all queued events.
For tests and the future aggregator. Returns the current
queue contents and resets the queue to empty.
Returns:
List of all queued Events
"""
events, self._queue = self._queue, []
return events
def __len__(self) -> int:
"""Return the number of queued events."""
return len(self._queue)
"""Severity-based event routing.
The severity router subscribes to the bus and forks each event into
one of two paths based on severity:
- immediate immediate_handler (dispatcher for live delivery)
- priority/routine digest_handler (queue for batched summaries)
Usage:
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
bus.subscribe(router.handle)
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class SeverityRouter:
"""Routes events to immediate or digest handlers based on severity.
Immediate-severity events go directly to live delivery channels.
Priority and routine events are queued for periodic digest summaries.
"""
def __init__(
self,
immediate_handler: Callable[[Event], None],
digest_handler: Callable[[Event], None],
):
"""Initialize the severity router.
Args:
immediate_handler: Called for severity="immediate" events
digest_handler: Called for severity in ("priority", "routine")
"""
self._immediate = immediate_handler
self._digest = digest_handler
self._logger = logging.getLogger("meshai.pipeline.severity_router")
def handle(self, event: Event) -> None:
"""Route an event based on its severity.
Args:
event: The Event to route
"""
if event.severity == "immediate":
self._logger.info(
f"IMMEDIATE: {event.source}/{event.category} {event.title}"
)
self._immediate(event)
elif event.severity in ("priority", "routine"):
self._logger.info(
f"DIGEST QUEUED [{event.severity}]: {event.title}"
)
self._digest(event)
else:
self._logger.warning(
f"Unknown severity {event.severity!r} on event {event.id}, dropping"
)
class StubDigestQueue:
"""Placeholder digest queue for Phase 2.1.
This is a stub that simply collects events in memory. Phase 2.3
will replace this with the real aggregator that renders and
delivers periodic digest summaries.
"""
def __init__(self):
self._queue: list[Event] = []
self._logger = logging.getLogger("meshai.pipeline.digest_stub")
def enqueue(self, event: Event) -> None:
"""Add an event to the digest queue.
Args:
event: The Event to queue for digest delivery
"""
self._queue.append(event)
toggle = get_toggle(event.category) or "unknown"
self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}")
def drain(self) -> list[Event]:
"""Return and clear all queued events.
For tests and the future aggregator. Returns the current
queue contents and resets the queue to empty.
Returns:
List of all queued Events
"""
events, self._queue = self._queue, []
return events
def __len__(self) -> int:
"""Return the number of queued events."""
return len(self._queue)

View file

@ -1,160 +1,160 @@
"""Region tagger for mapping coordinates and NWS zones to regions.
This module provides functions to:
- Map lat/lon coordinates to the nearest configured region
- Map NWS zone codes to matching regions
Usage:
from meshai.notifications.region_tagger import tag_by_coordinates, tag_by_nws_zone
from meshai.config import RegionAnchor
regions = [
RegionAnchor(name="South Western ID", lat=43.615, lon=-116.2023,
nws_zones=["IDZ016", "IDZ030"]),
RegionAnchor(name="Magic Valley", lat=42.5558, lon=-114.4701,
nws_zones=["IDZ031"]),
]
# Find region by coordinates
region = tag_by_coordinates(43.6, -116.2, regions)
# Returns: "South Western ID"
# Find regions by NWS zone
regions = tag_by_nws_zone("IDZ016", regions)
# Returns: ["South Western ID"]
"""
import math
from typing import Optional
# Import RegionAnchor type for annotations
# Actual import happens at function call time to avoid circular imports
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from meshai.config import RegionAnchor
# Earth radius in miles (mean radius)
EARTH_RADIUS_MILES = 3958.8
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate the great-circle distance between two points on Earth.
Uses the haversine formula for accuracy on a spherical Earth model.
Args:
lat1: Latitude of first point in degrees
lon1: Longitude of first point in degrees
lat2: Latitude of second point in degrees
lon2: Longitude of second point in degrees
Returns:
Distance in miles
"""
# Convert to radians
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
lon1_rad = math.radians(lon1)
lon2_rad = math.radians(lon2)
# Differences
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine formula
a = math.sin(dlat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
return EARTH_RADIUS_MILES * c
def tag_by_coordinates(
lat: float,
lon: float,
regions: list, # list[RegionAnchor]
radius_miles: float = 25.0,
) -> Optional[str]:
"""Return the name of the nearest region within radius_miles.
Finds the closest region anchor to the given coordinates. If the
closest anchor is within radius_miles, returns its name. Otherwise
returns None.
Args:
lat: Latitude of the point to tag
lon: Longitude of the point to tag
regions: List of RegionAnchor objects to search
radius_miles: Maximum distance to consider (default 25 miles)
Returns:
Name of the nearest region within range, or None if no match
"""
if not regions:
return None
closest_region = None
closest_distance = float("inf")
for region in regions:
# Skip regions without valid coordinates
region_lat = getattr(region, "lat", None)
region_lon = getattr(region, "lon", None)
if region_lat is None or region_lon is None:
continue
if region_lat == 0.0 and region_lon == 0.0:
# Treat (0, 0) as unset coordinates
continue
distance = haversine_distance(lat, lon, region_lat, region_lon)
if distance < closest_distance:
closest_distance = distance
closest_region = region
# Check if closest is within radius
if closest_region is not None and closest_distance <= radius_miles:
return getattr(closest_region, "name", None)
return None
def tag_by_nws_zone(
zone_code: str,
regions: list, # list[RegionAnchor]
) -> list[str]:
"""Return all region names whose nws_zones list contains zone_code.
Multiple regions can match the same zone (a zone may span multiple
configured regions).
Args:
zone_code: NWS zone code to match (e.g., "IDZ016")
regions: List of RegionAnchor objects to search
Returns:
List of region names that contain this zone, empty if no matches
"""
if not zone_code or not regions:
return []
# Normalize zone code to uppercase for case-insensitive matching
zone_upper = zone_code.upper().strip()
matching_regions = []
for region in regions:
region_zones = getattr(region, "nws_zones", None)
if not region_zones:
continue
# Check if zone matches any in this region's list (case-insensitive)
for rz in region_zones:
if rz.upper().strip() == zone_upper:
region_name = getattr(region, "name", None)
if region_name:
matching_regions.append(region_name)
break # Don't add same region twice
return matching_regions
"""Region tagger for mapping coordinates and NWS zones to regions.
This module provides functions to:
- Map lat/lon coordinates to the nearest configured region
- Map NWS zone codes to matching regions
Usage:
from meshai.notifications.region_tagger import tag_by_coordinates, tag_by_nws_zone
from meshai.config import RegionAnchor
regions = [
RegionAnchor(name="South Western ID", lat=43.615, lon=-116.2023,
nws_zones=["IDZ016", "IDZ030"]),
RegionAnchor(name="Magic Valley", lat=42.5558, lon=-114.4701,
nws_zones=["IDZ031"]),
]
# Find region by coordinates
region = tag_by_coordinates(43.6, -116.2, regions)
# Returns: "South Western ID"
# Find regions by NWS zone
regions = tag_by_nws_zone("IDZ016", regions)
# Returns: ["South Western ID"]
"""
import math
from typing import Optional
# Import RegionAnchor type for annotations
# Actual import happens at function call time to avoid circular imports
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from meshai.config import RegionAnchor
# Earth radius in miles (mean radius)
EARTH_RADIUS_MILES = 3958.8
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate the great-circle distance between two points on Earth.
Uses the haversine formula for accuracy on a spherical Earth model.
Args:
lat1: Latitude of first point in degrees
lon1: Longitude of first point in degrees
lat2: Latitude of second point in degrees
lon2: Longitude of second point in degrees
Returns:
Distance in miles
"""
# Convert to radians
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
lon1_rad = math.radians(lon1)
lon2_rad = math.radians(lon2)
# Differences
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine formula
a = math.sin(dlat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
return EARTH_RADIUS_MILES * c
def tag_by_coordinates(
lat: float,
lon: float,
regions: list, # list[RegionAnchor]
radius_miles: float = 25.0,
) -> Optional[str]:
"""Return the name of the nearest region within radius_miles.
Finds the closest region anchor to the given coordinates. If the
closest anchor is within radius_miles, returns its name. Otherwise
returns None.
Args:
lat: Latitude of the point to tag
lon: Longitude of the point to tag
regions: List of RegionAnchor objects to search
radius_miles: Maximum distance to consider (default 25 miles)
Returns:
Name of the nearest region within range, or None if no match
"""
if not regions:
return None
closest_region = None
closest_distance = float("inf")
for region in regions:
# Skip regions without valid coordinates
region_lat = getattr(region, "lat", None)
region_lon = getattr(region, "lon", None)
if region_lat is None or region_lon is None:
continue
if region_lat == 0.0 and region_lon == 0.0:
# Treat (0, 0) as unset coordinates
continue
distance = haversine_distance(lat, lon, region_lat, region_lon)
if distance < closest_distance:
closest_distance = distance
closest_region = region
# Check if closest is within radius
if closest_region is not None and closest_distance <= radius_miles:
return getattr(closest_region, "name", None)
return None
def tag_by_nws_zone(
zone_code: str,
regions: list, # list[RegionAnchor]
) -> list[str]:
"""Return all region names whose nws_zones list contains zone_code.
Multiple regions can match the same zone (a zone may span multiple
configured regions).
Args:
zone_code: NWS zone code to match (e.g., "IDZ016")
regions: List of RegionAnchor objects to search
Returns:
List of region names that contain this zone, empty if no matches
"""
if not zone_code or not regions:
return []
# Normalize zone code to uppercase for case-insensitive matching
zone_upper = zone_code.upper().strip()
matching_regions = []
for region in regions:
region_zones = getattr(region, "nws_zones", None)
if not region_zones:
continue
# Check if zone matches any in this region's list (case-insensitive)
for rz in region_zones:
if rz.upper().strip() == zone_upper:
region_name = getattr(region, "name", None)
if region_name:
matching_regions.append(region_name)
break # Don't add same region twice
return matching_regions

View file

@ -1,64 +1,64 @@
"""Message summarizer for mesh delivery."""
import logging
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from ..backends import LLMBackend
logger = logging.getLogger(__name__)
class MessageSummarizer:
"""Summarizes long messages for mesh delivery.
Only used when:
- Delivering to mesh channels (broadcast or DM)
- Message exceeds max_chars (default 200)
- LLM backend is available
Email and webhook channels receive full messages.
"""
def __init__(self, llm_backend: Optional["LLMBackend"] = None):
self._llm = llm_backend
async def summarize(self, message: str, max_chars: int = 195) -> str:
"""Summarize a message to fit within max_chars.
Args:
message: Original message text
max_chars: Maximum characters for summary
Returns:
Summarized message, or truncated original if LLM unavailable
"""
if len(message) <= max_chars:
return message
if not self._llm:
return message[:max_chars - 3] + "..."
prompt = (
"Summarize this alert in under %d characters. "
"Keep severity, location, and key facts. No preamble, just the summary:\n\n%s"
% (max_chars, message)
)
try:
# Use the LLM to generate a summary
response = await self._llm.generate(
prompt,
system_prompt="You are a concise alert summarizer. Output only the summary, no explanation.",
)
summary = response.strip()
# Ensure it fits
if len(summary) <= max_chars:
return summary
return summary[:max_chars - 3] + "..."
except Exception as e:
logger.debug("LLM summarization failed: %s", e)
return message[:max_chars - 3] + "..."
"""Message summarizer for mesh delivery."""
import logging
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from ..backends import LLMBackend
logger = logging.getLogger(__name__)
class MessageSummarizer:
"""Summarizes long messages for mesh delivery.
Only used when:
- Delivering to mesh channels (broadcast or DM)
- Message exceeds max_chars (default 200)
- LLM backend is available
Email and webhook channels receive full messages.
"""
def __init__(self, llm_backend: Optional["LLMBackend"] = None):
self._llm = llm_backend
async def summarize(self, message: str, max_chars: int = 195) -> str:
"""Summarize a message to fit within max_chars.
Args:
message: Original message text
max_chars: Maximum characters for summary
Returns:
Summarized message, or truncated original if LLM unavailable
"""
if len(message) <= max_chars:
return message
if not self._llm:
return message[:max_chars - 3] + "..."
prompt = (
"Summarize this alert in under %d characters. "
"Keep severity, location, and key facts. No preamble, just the summary:\n\n%s"
% (max_chars, message)
)
try:
# Use the LLM to generate a summary
response = await self._llm.generate(
prompt,
system_prompt="You are a concise alert summarizer. Output only the summary, no explanation.",
)
summary = response.strip()
# Ensure it fits
if len(summary) <= max_chars:
return summary
return summary[:max_chars - 3] + "..."
except Exception as e:
logger.debug("LLM summarization failed: %s", e)
return message[:max_chars - 3] + "..."