From 72a7a90f4dbfba2ca4f23c825dd6dbb2e3eefb20 Mon Sep 17 00:00:00 2001 From: zvx-echo6 Date: Wed, 13 May 2026 18:08:09 -0600 Subject: [PATCH] fix(notifications): test shows live data, not just canned examples - Test always shows current data for the rule's feed categories - RF rules show live SFI/Kp/R/S/G and ducting conditions - Weather rules show active NWS alert count and headlines - Fire rules show active fire/hotspot count - Stream rules show current gauge readings - Mesh rules show current health score and infra status - Send Current Conditions delivers live snapshot through channel - Send Test Alert delivers example through channel - Send Live Alert available when real conditions match Co-Authored-By: Claude Opus 4.5 --- dashboard-frontend/src/App.tsx | 25 +- dashboard-frontend/src/components/Layout.tsx | 20 +- dashboard-frontend/src/pages/Reference.tsx | 972 +++++++++++++++++++ meshai/commands/roads_cmd.py | 74 ++ meshai/commands/streams_cmd.py | 73 ++ meshai/commands/subscribe.py | 189 ++-- meshai/dashboard/api/notification_routes.py | 113 +++ meshai/notifications/__init__.py | 6 + meshai/notifications/categories.py | 157 +++ meshai/notifications/channels.py | 308 ++++++ meshai/notifications/router.py | 266 +++++ meshai/notifications/summarizer.py | 64 ++ meshai/sources/mqtt_source.py | 435 +++++++++ 13 files changed, 2627 insertions(+), 75 deletions(-) create mode 100644 dashboard-frontend/src/pages/Reference.tsx create mode 100644 meshai/commands/roads_cmd.py create mode 100644 meshai/commands/streams_cmd.py create mode 100644 meshai/dashboard/api/notification_routes.py create mode 100644 meshai/notifications/__init__.py create mode 100644 meshai/notifications/categories.py create mode 100644 meshai/notifications/channels.py create mode 100644 meshai/notifications/router.py create mode 100644 meshai/notifications/summarizer.py create mode 100644 meshai/sources/mqtt_source.py diff --git a/dashboard-frontend/src/App.tsx b/dashboard-frontend/src/App.tsx index b313ef7..7e330bd 100644 --- a/dashboard-frontend/src/App.tsx +++ b/dashboard-frontend/src/App.tsx @@ -5,18 +5,25 @@ import Mesh from './pages/Mesh' import Environment from './pages/Environment' import Config from './pages/Config' import Alerts from './pages/Alerts' +import Notifications from './pages/Notifications' +import Reference from './pages/Reference' +import { ToastProvider } from './components/ToastProvider' function App() { return ( - - - } /> - } /> - } /> - } /> - } /> - - + + + + } /> + } /> + } /> + } /> + } /> + } /> + } /> + + + ) } diff --git a/dashboard-frontend/src/components/Layout.tsx b/dashboard-frontend/src/components/Layout.tsx index c620da8..c5640a1 100644 --- a/dashboard-frontend/src/components/Layout.tsx +++ b/dashboard-frontend/src/components/Layout.tsx @@ -6,9 +6,12 @@ import { Cloud, Settings, Bell, + BellRing, + BookOpen, } from 'lucide-react' import { fetchStatus, type SystemStatus } from '@/lib/api' import { useWebSocket } from '@/hooks/useWebSocket' +import { useToast } from './ToastProvider' interface LayoutProps { children: ReactNode @@ -20,6 +23,8 @@ const navItems = [ { path: '/environment', label: 'Environment', icon: Cloud }, { path: '/config', label: 'Config', icon: Settings }, { path: '/alerts', label: 'Alerts', icon: Bell }, + { path: '/notifications', label: 'Notifications', icon: BellRing }, + { path: '/reference', label: 'Reference', icon: BookOpen }, ] function formatUptime(seconds: number): string { @@ -39,8 +44,21 @@ function getPageTitle(pathname: string): string { export default function Layout({ children }: LayoutProps) { const location = useLocation() - const { connected } = useWebSocket() + const { connected, lastAlert } = useWebSocket() + const { addToast } = useToast() const [status, setStatus] = useState(null) + const [lastAlertId, setLastAlertId] = useState(null) + + // Trigger toast on new alerts + useEffect(() => { + if (lastAlert) { + const alertId = `${lastAlert.type}-${lastAlert.message}-${lastAlert.timestamp}` + if (alertId !== lastAlertId) { + setLastAlertId(alertId) + addToast(lastAlert) + } + } + }, [lastAlert, lastAlertId, addToast]) const [currentTime, setCurrentTime] = useState(new Date()) useEffect(() => { diff --git a/dashboard-frontend/src/pages/Reference.tsx b/dashboard-frontend/src/pages/Reference.tsx new file mode 100644 index 0000000..dc63542 --- /dev/null +++ b/dashboard-frontend/src/pages/Reference.tsx @@ -0,0 +1,972 @@ +import { useState, useEffect, useRef } from 'react' +import { useLocation } from 'react-router-dom' +import { + Search, Droplets, Flame, Satellite, CloudLightning, Sun, + Radio, Mountain, Car, Construction, Activity, Bell, Terminal, + Code, ExternalLink +} from 'lucide-react' + +// Topic definitions +const TOPICS = [ + { id: 'stream-gauges', label: 'Stream Gauges', icon: Droplets }, + { id: 'wildfire', label: 'Wildfire', icon: Flame }, + { id: 'firms', label: 'Satellite Fire Detection (FIRMS)', icon: Satellite }, + { id: 'weather-alerts', label: 'Weather Alerts', icon: CloudLightning }, + { id: 'solar', label: 'Solar & Geomagnetic', icon: Sun }, + { id: 'ducting', label: 'Tropospheric Ducting', icon: Radio }, + { id: 'avalanche', label: 'Avalanche Danger', icon: Mountain }, + { id: 'traffic', label: 'Traffic Flow', icon: Car }, + { id: 'roads-511', label: 'Road Conditions (511)', icon: Construction }, + { id: 'mesh-health', label: 'Mesh Health', icon: Activity }, + { id: 'notifications', label: 'Notifications', icon: Bell }, + { id: 'commands', label: 'Commands', icon: Terminal }, + { id: 'api', label: 'API Reference', icon: Code }, +] + +// Status indicator component for colored dots +function StatusDot({ color }: { color: 'green' | 'yellow' | 'orange' | 'red' | 'black' }) { + const colorClasses = { + green: 'bg-green-500', + yellow: 'bg-yellow-500', + orange: 'bg-orange-500', + red: 'bg-red-500', + black: 'bg-slate-800 border border-slate-600', + } + return +} + +// Table component styled for dark theme +function RefTable({ headers, rows }: { headers: string[]; rows: (string | React.ReactNode)[][] }) { + return ( +
+ + + + {headers.map((h, i) => ( + + ))} + + + + {rows.map((row, i) => ( + + {row.map((cell, j) => ( + + ))} + + ))} + +
{h}
{cell}
+
+ ) +} + +// External link component +function ExtLink({ href, children }: { href: string; children: React.ReactNode }) { + return ( + + {children} + + ) +} + +// Section header +function SectionHeader({ children }: { children: React.ReactNode }) { + return

{children}

+} + +// Sub-header +function SubHeader({ children }: { children: React.ReactNode }) { + return

{children}

+} + +// Monospace text +function Mono({ children }: { children: React.ReactNode }) { + return {children} +} + +// Topic section wrapper +function TopicSection({ id, title, children }: { id: string; title: string; children: React.ReactNode }) { + return ( +
+

{title}

+
+ {children} +
+
+ ) +} + +export default function Reference() { + const location = useLocation() + const [searchQuery, setSearchQuery] = useState('') + const [activeTopic, setActiveTopic] = useState('stream-gauges') + const contentRef = useRef(null) + + // Handle hash navigation + useEffect(() => { + const hash = location.hash.replace('#', '') + if (hash && TOPICS.find(t => t.id === hash)) { + setActiveTopic(hash) + const element = document.getElementById(hash) + if (element) { + element.scrollIntoView({ behavior: 'smooth' }) + } + } + }, [location.hash]) + + // Filter topics by search + const filteredTopics = TOPICS.filter(t => + t.label.toLowerCase().includes(searchQuery.toLowerCase()) + ) + + const scrollToTopic = (topicId: string) => { + setActiveTopic(topicId) + const element = document.getElementById(topicId) + if (element) { + element.scrollIntoView({ behavior: 'smooth' }) + } + window.history.replaceState(null, '', `#${topicId}`) + } + + return ( +
+ {/* Topic sidebar */} + + + {/* Main content */} +
+
+

+ Everything you need to understand and configure MeshAI's monitoring and alerting systems. +

+ + {/* Stream Gauges */} + + What You're Looking At +

+ MeshAI watches river and stream levels at gauges you configure. Each gauge reports two things: +

+

+ Water Level (Gage Height) — how high the water is, measured in feet. Important: this is NOT the depth of the river. It's the height above a fixed measuring point that's different at every gauge. A reading of "10 feet" at one gauge means something completely different than "10 feet" at another. You can only compare readings from the SAME gauge over time. +

+

+ Flow (Discharge) — how much water is moving past the gauge, in cubic feet per second (CFS). Think of it as the river's "throughput." For scale: +

+
    +
  • A small creek: 50-200 CFS
  • +
  • A mid-size river: 1,000-5,000 CFS
  • +
  • A big river in spring runoff: 10,000+ CFS
  • +
+ + When Does It Flood? +

+ Flood levels are set by the National Weather Service, not USGS. NWS looks at each specific gauge location and decides "at what water level does the road flood? At what level do buildings get water?" Those levels are different everywhere. +

+

Action Stage — water is rising, time to start paying attention. Usually still inside the riverbanks.

+

Minor Flood — low-lying roads start getting water on them. NWS issues a Flood Advisory.

+

Moderate Flood — water in buildings near the river. Some people need to evacuate. NWS issues a Flood Warning.

+

Major Flood — widespread flooding. Many people evacuating. Serious property damage.

+

+ MeshAI automatically looks up the flood levels for your gauge from NWS when you add a site. Some remote gauges don't have flood levels assigned — for those, you set them manually if you know what water levels cause problems in your area. +

+ + Low Water / Drought +

+ There's no official "drought stage" for most gauges. If you need to monitor low water (irrigation, fish habitat), set a manual low-water threshold based on what you know about your local river. +

+ + Setting It Up +
    +
  1. Find your gauge at waterdata.usgs.gov/nwis
  2. +
  3. Copy the site number (like 13090500)
  4. +
  5. Add it in Config → Environmental → USGS
  6. +
  7. MeshAI auto-fills the gauge name and flood levels from NWS
  8. +
+

If NWS flood levels don't populate, your gauge may not have them. Set manual thresholds if you know your local conditions.

+ + Learn More +
    +
  • USGS Water Data — find gauges near you
  • +
  • NWS Water Prediction Service — flood forecasts and thresholds
  • +
  • Understanding Streamflow — USGS explainer
  • +
+
+ + {/* Wildfire */} + + What You're Looking At +

+ MeshAI tracks active wildfire perimeters from the National Interagency Fire Center (NIFC). For each fire, you see the name, size, how much is contained, and how far it is from your mesh nodes. +

+ + Fire Size — How Big Is It? + +

For reference, 1,000 acres is about 1.5 square miles.

+ + Containment — Is It Under Control? +

+ Containment means the percentage of the fire's edge where firefighters have built a control line (a cleared strip to stop the fire from spreading further). It does NOT mean the fire is out inside that line. +

+
    +
  • 0-30% — Essentially uncontrolled. The fire goes where it wants.
  • +
  • 50% — Good progress, but half the edge can still grow.
  • +
  • 80%+ — Well controlled. Major growth unlikely.
  • +
  • 100% — The edge is fully controlled. But the fire may STILL be actively burning inside. "100% contained" does NOT mean "out."
  • +
+ + How Far Away Should I Worry? + Under 5 km (3 miles), <>Immediate threat. This is evacuation-order range. Embers can fly this far in wind.], + [<> 5-15 km (3-10 miles), <>Prepare. The fire could reach you in hours under bad conditions. Have a plan.], + [<> 15-30 km (10-20 miles), <>Watch. Smoke is likely. Wind shifts could change things fast.], + [<> Over 30 km (20 miles), <>Awareness. Keep an eye on it, but no immediate threat.], + ]} + /> +

+ How fast can a fire travel? In grass with wind: up to 14 mph. In heavy timber: 1-6 mph. A fire 10 miles away could theoretically reach you in 1-2 hours under worst-case conditions, but typical spread is much slower. +

+ + Which Matters More — Size or Distance? +

+ Distance is the immediate concern. A small uncontained fire 10 km away is more dangerous right now than a huge fire 50 km away. But big fires have more energy and can grow fast under wind shifts — keep watching them. +

+ + Setting It Up +

+ Just configure your state code (like US-ID for Idaho) in Config → Environmental → Fires. MeshAI polls NIFC every 10 minutes for active fires in that state and computes the distance to your mesh nodes automatically. +

+ + Learn More +
    +
  • InciWeb — detailed incident information
  • +
  • NIFC Fire Map — raw perimeter data
  • +
  • Ready.gov Wildfires — preparedness guide
  • +
+
+ + {/* FIRMS */} + + What You're Looking At +

+ NASA's VIIRS satellites orbit the Earth and look for heat signatures on the ground. When they see something hot — a fire, a factory, a sunlit building — they flag it as a "hotspot." MeshAI checks these detections for your area. +

+

+ Why this matters: satellite hotspots show up hours before official fire perimeters are mapped. If a new fire starts near your mesh, the satellite might see it before anyone on the ground reports it. +

+ + Confidence — Is It Really a Fire? +

Each detection gets a confidence rating:

+ +

+ Recommendation: Set the filter to "Nominal + High." If you include "Low" you'll get alerts for every hot parking lot on a summer day. +

+ + FRP — How Intense Is It? +

FRP (Fire Radiative Power) measures the heat output in megawatts. Think of it as "how hot is this thing":

+ +

Setting the minimum FRP to 5 MW filters out most industrial and agricultural false alarms.

+ + New Ignition Detection +

+ MeshAI cross-references satellite hotspots against known NIFC fire perimeters. If a hotspot is NOT near any known fire, it gets flagged as a potential new ignition — maybe a new fire just started. These get elevated priority regardless of confidence level. +

+ + Timing +

+ Satellite data arrives 1-3 hours after the satellite passes overhead. Each location gets observed about 6 times per day across all satellites, so there are multi-hour gaps. This is not real-time — it's "pretty recent." +

+ + Getting an API Key +
    +
  1. Go to FIRMS API page
  2. +
  3. Click "Get MAP_KEY"
  4. +
  5. Register for a free Earthdata account
  6. +
  7. Your key arrives by email
  8. +
  9. Enter it in Config → Environmental → FIRMS
  10. +
+ + Learn More +
    +
  • FIRMS Fire Map — see hotspots on a map
  • +
  • FIRMS FAQ — how it works
  • +
+
+ + {/* Weather Alerts */} + + What You're Looking At +

+ MeshAI watches for NWS (National Weather Service) alerts affecting your area — warnings, watches, and advisories. +

+ + Alert Severity — How Serious Is It? + + + When Should I Act? (Urgency) + + + How Sure Are They? (Certainty) + + + These Are Separate Scales +

+ A single alert has all three. A hurricane warning for next week is "Severe + Future + Likely." A tornado spotted on the ground is "Extreme + Immediate + Observed." An air quality advisory is "Minor + Expected + Possible." +

+ + What Minimum Severity Should I Set? + Moderate ✓, 'Watches, Advisories, and Warnings', 'Special Weather Statements'], + ['Severe', 'Only Warnings — things happening NOW', 'Watches (which give you hours of advance warning)'], + ['Extreme', 'Only the rarest events', 'Most Tornado and Severe Thunderstorm Warnings'], + ]} + /> +

+ Moderate is recommended. It catches Watches (advance warning that conditions may worsen) and Advisories (conditions exist but aren't severe) while filtering out the informational stuff. +

+ + Finding Your NWS Zone +
    +
  1. Go to weather.gov
  2. +
  3. Enter your location
  4. +
  5. Find your zone code at NWS Zone Map
  6. +
  7. Zone codes look like: IDZ016, UTZ040, etc.
  8. +
+ + The User-Agent Field +

+ NWS wants to know who's using their API — not for approval, just so they can contact you if something breaks. You make it up: +

+

(meshai, you@email.com)

+

No registration. No waiting. Just type it in.

+ + Learn More +
    +
  • NWS Active Alerts — see current alerts
  • +
  • NWS API Docs — technical details
  • +
+
+ + {/* Solar & Geomagnetic */} + + What You're Looking At +

+ MeshAI tracks space weather — solar activity and its effects on Earth's magnetic field. This matters for radio operators because the sun directly controls how well HF radio works, and major solar events can affect all radio communications. +

+ + Solar Flux Index (SFI) +

Think of SFI as a "how active is the sun" number. Higher = better for HF radio, but also higher risk of solar flares.

+ +

Quick rule: SFI above 90 and Kp below 4 = good day for HF radio.

+ + Kp Index +

Kp measures how disturbed Earth's magnetic field is, on a 0-9 scale. Higher = more disturbance = worse for HF radio but better for aurora viewing.

+ 5, <>Minor storm (G1). HF noticeably degraded. Aurora visible at high latitudes (~60°N).], + [6, <>Moderate storm (G2). HF getting rough. Aurora moving south (~55°N).], + [7, <>Strong storm (G3). HF unreliable for 1-2 days. Aurora at mid-latitudes.], + [8-9, <>Severe/Extreme storm. HF may black out completely. Aurora visible at very low latitudes. Power grid stress possible.], + ]} + /> + + R / S / G Scales +

NOAA's shorthand for three types of space weather events:

+ R (Radio Blackouts) — from solar flares: +
    +
  • R1-R2: Brief HF disruption. You might not notice.
  • +
  • R3: HF goes out for about an hour on the sunlit side of Earth.
  • +
  • R4-R5: HF dead for hours. Serious.
  • +
+ S (Solar Radiation Storms) — from energetic particles: +
    +
  • Mostly affects polar regions and satellites
  • +
  • S3+: Polar HF goes out entirely
  • +
+ G (Geomagnetic Storms) — from solar wind disturbances: +
    +
  • Same as the Kp scale: G1 = Kp 5, up to G5 = Kp 9
  • +
+ + Bz — The Storm Predictor +

+ Bz measures the direction of the solar wind's magnetic field. When it points south (negative values), the solar wind can dump energy into Earth's magnetic field, causing storms. +

+ +

Bz can change fast — minute to minute. What matters is whether it stays negative for hours, not brief dips.

+ + Learn More +
    +
  • SWPC Space Weather Dashboard — live data
  • +
  • NOAA Space Weather Scales — what R/S/G mean
  • +
  • HamQSL Solar Page — ham-friendly display
  • +
  • Planetary K-Index — live Kp
  • +
+
+ + {/* Tropospheric Ducting */} + + What You're Looking At +

+ Sometimes the atmosphere creates an invisible "pipe" that traps radio signals and carries them much farther than normal. This is called tropospheric ducting. It mostly affects VHF and UHF frequencies. +

+

+ MeshAI watches for these conditions by analyzing weather data (temperature and humidity at different altitudes) over your mesh area. +

+ + How Do I Know If Ducting Is Happening? +

MeshAI reports a "condition" based on the atmospheric profile:

+ + + What You'll Actually Notice +

When ducting happens on your mesh:

+
    +
  • Distant repeaters you've never heard suddenly come in
  • +
  • Nodes appear from far outside your normal range
  • +
  • You hear FM radio stations from other cities
  • +
  • ADS-B flight tracking range gets much longer
  • +
  • There might be interference from distant stations on your frequency
  • +
+ + The dM/dz Number +

The dashboard shows a "dM/dz" value in "M-units/km." You don't need to understand the math — just know:

+
    +
  • Around 118 = normal atmosphere
  • +
  • Below 79 = enhanced propagation starting
  • +
  • Below 0 (negative) = ducting is happening
  • +
  • Below -50 = strong ducting — classic VHF/UHF DX event
  • +
+ + When Does Ducting Happen? +
    +
  • Under high-pressure weather systems (clear, stable air)
  • +
  • When warm air sits on top of cool air (temperature inversion)
  • +
  • Most common in late summer and early fall
  • +
  • Strongest along coastlines and over water
  • +
  • In mountain valleys: cold air pooling in fall/winter can create surface ducts
  • +
+ + Setting It Up +

+ Just configure the latitude and longitude of the center of your mesh area in Config → Environmental → Ducting. MeshAI checks the atmospheric conditions there every 3 hours using free weather model data. No API key needed. +

+ + Learn More +
    +
  • Tropo Forecast Maps (Hepburn) — 6-day tropo prediction
  • +
  • DX Maps — real-time VHF/UHF propagation reports
  • +
  • Wikipedia: Tropospheric Propagation — background
  • +
+
+ + {/* Avalanche Danger */} + + What You're Looking At +

+ MeshAI pulls avalanche forecasts from your regional avalanche center during winter months. The danger scale has 5 levels and it's the same across all of North America. +

+ + The Danger Scale + , 'Generally safe. Normal caution in steep terrain.'], + ['2', 'Moderate', , 'Be careful on specific terrain features. Evaluate conditions.'], + ['3', 'Considerable', , <>DANGEROUS. This is where most people die in avalanches — they see "3 out of 5" and think it's fine. It's not. Use extreme caution.], + ['4', 'High', , <>Very dangerous. Stay off anything steep.], + ['5', 'Extreme', , <>Don't go out. Avalanches are happening on their own.], + ]} + /> + + The Most Important Thing to Know +

+ Level 3 (Considerable) kills more people than any other level. People look at "3 out of 5" and think "middle of the road, probably okay." In reality, the risk roughly doubles at each step up the scale. Level 3 is where dangerous conditions overlap with people thinking they can handle it. +

+ + Seasonal +

+ MeshAI only checks avalanche conditions during winter months (configurable, default December through April). Outside season, it shows "off season" and saves API calls. +

+ + Finding Your Avalanche Center +

+ Go to avalanche.org/avalanche-centers/ for a map. Common center codes: +

+
    +
  • SNFAC — Sawtooth (central Idaho)
  • +
  • UAC — Utah
  • +
  • NWAC — Cascades/Olympics (WA/OR)
  • +
  • CAIC — Colorado
  • +
  • SAC — Sierra Nevada (CA)
  • +
  • GNFAC — Gallatin (SW Montana)
  • +
+ + Learn More +
    +
  • Avalanche.org — US forecasts
  • +
  • Avalanche Danger Scale — full scale explanation
  • +
  • Know Before You Go — avalanche awareness
  • +
+
+ + {/* Traffic Flow */} + + What You're Looking At +

+ MeshAI monitors traffic speed on road segments you configure, using data from TomTom (real vehicles with navigation apps reporting their speed). +

+ + Speed Ratio — The Key Number +

MeshAI compares current speed to "free-flow speed" (what traffic normally does when the road is empty). The ratio tells you how congested it is:

+ Above 85%, 'Normal. Traffic flowing fine.'], + [<> 65-85%, 'Slow. Heavier than usual but moving.'], + [<> 40-65%, 'Congested. Significant delays.'], + [<> Below 40%, 'Gridlock. Barely moving.'], + ]} + /> +

+ Note: "free-flow speed" is NOT the speed limit. It's what traffic actually does on that road when nobody's in the way. Drivers often exceed speed limits on open highways. +

+ + Confidence — Can You Trust the Data? +

TomTom's confidence score tells you how much of the reading comes from real vehicles right now vs historical averages:

+ Unreliable — mostly guessing from historical patterns. Don't alert on this.], + ]} + /> +

Set minimum confidence to 0.7 to avoid false congestion alerts at night or on rural roads where few probe vehicles drive.

+ + Setting Up Corridors +

Each "corridor" is a point on a road you want to monitor. To add one:

+
    +
  1. Go to Google Maps, find the road
  2. +
  3. Right-click the road → "What's here?" → copy the coordinates
  4. +
  5. Add the corridor in Config with a name and those coordinates
  6. +
  7. TomTom finds the nearest road segment automatically
  8. +
+ + Getting an API Key +
    +
  1. Sign up at developer.tomtom.com (free)
  2. +
  3. Create an app → get your API key
  4. +
  5. Free tier: 2,500 requests/day (plenty for 5-10 corridors)
  6. +
+ + Learn More +
    +
  • TomTom Developer Portal — API docs and key signup
  • +
  • TomTom Traffic Index — city congestion rankings
  • +
+
+ + {/* 511 Road Conditions */} + + What You're Looking At +

+ 511 systems report road closures, construction, weather events, mountain pass conditions, and incidents. Every state runs their own 511 system — there is no national API. +

+ + Setting It Up +

+ You need to find YOUR state's 511 developer API. MeshAI does not include a default URL because every state is different. Some states have free public APIs, some require registration, and some don't have developer APIs at all. +

+

Configure in Config → Environmental → 511:

+
    +
  • Base URL — your state's API endpoint
  • +
  • API Key — if required by your state
  • +
  • Endpoints — which data feeds to poll (varies by state)
  • +
+ + Learn More +

Check your state's 511 or DOT website for developer information.

+
+ + {/* Mesh Health */} + + Health Score +

MeshAI computes a 0-100 health score for your mesh network by looking at five areas:

+ + + Health Tiers + Healthy, "Everything's working well."], + ['75-89', <> Slight degradation, 'Some issues but the mesh is functional.'], + ['50-74', <> Unhealthy, 'Multiple problems. Reliability is affected.'], + ['25-49', <> Warning, 'Significant issues. The mesh is struggling.'], + ['0-24', <> Critical, 'Major failures. Barely functional.'], + ]} + /> + + Channel Utilization — Is the Radio Channel Full? +

+ Meshtastic radios share one LoRa channel. If too many nodes are transmitting too often, they step on each other and messages get lost. +

+ Under 25%, 'Healthy. The firmware itself starts throttling above 25% to protect the channel — so under 25% is the target.'], + [<> 25-40%, 'Getting busy. Common on larger meshes. Worth watching.'], + [<> 40-50%, 'Congested. The firmware throttles GPS updates above 40%. Messages are colliding and retrying.'], + [<> Over 50%, 'Serious problem. More time is spent retrying than communicating. Mesh reliability drops fast.'], + [<> Over 65%, 'Documented failure point on busy LONG_FAST meshes. The mesh becomes unusable.'], + ]} + /> + + Packet Flooding +

+ ⚠️ "Packet flooding" means a node sending too many RADIO PACKETS. This has nothing to do with water flooding. +

+

+ A normal Meshtastic node sends a packet every few minutes (announcing itself, reporting telemetry, updating position). If a node starts blasting packets every few seconds, something is wrong — firmware bug, stuck transmitter, or misconfiguration. +

+ + + Battery Levels +

+ Most Meshtastic radios (T-Beam, RAK4631, Heltec V3) use a single lithium battery cell. The voltage tells you how much charge is left: +

+ 3.60V, ~30%, <>⚠️ Warning — charge it soon], + [3.50V, ~15%, <>🔴 Low — charge it now], + [3.40V, ~7%, <>⚫ About to die], + ['3.30V', '~3%', 'Device shutting down'], + ]} + /> +

+ USB-powered nodes report 100% battery even if there's no battery installed. Battery alerts only matter for nodes actually running on battery power. +

+ + Node Offline Detection +

+ MeshAI marks a node as "offline" when it hasn't been heard for a configurable time period. Different node types need different thresholds: +

+ 2 hours, 'These should always be transmitting. 2 hours of silence means something is wrong.'], + ['Fixed client (wall power)', '2-4 hours', 'Same logic, slightly more lenient.'], + ['Mobile / vehicle', '4-8 hours', 'They go behind mountains, into garages, out of range. Normal.'], + ['Solar-powered', '12-24 hours', 'May shut down at night when solar stops charging.'], + ]} + /> +

+ Rule of thumb: set the threshold to about 4× the node's beacon interval. Too tight and nodes will constantly flap "offline/online" from normal gaps. Too loose and real outages go unnoticed. +

+
+ + {/* Notifications */} + + How It Works +
    +
  1. Something happens — a fire is detected, weather warning issued, node goes offline, etc.
  2. +
  3. MeshAI checks your rules — does this event match any of your notification rules? Is it severe enough? Are we in quiet hours?
  4. +
  5. If a rule matches — MeshAI sends the notification through whatever delivery method that rule is configured for.
  6. +
+ + Building Rules +

Each rule answers three questions:

+
    +
  • WHEN does it trigger? (which categories, what severity)
  • +
  • WHERE does it send? (mesh broadcast, email, webhook, etc.)
  • +
  • HOW OFTEN at most? (cooldown period)
  • +
+

+ Use "Add from Template" to start with a pre-built rule and customize it, or build from scratch with "Add Rule." +

+ + Severity Levels — What Should I Set? + Warning ✓, 'Take action (fire within 15km, severe weather, critical battery)', 'Low — recommended for most rules'], + ['Emergency', 'Life safety (extreme weather, fire at infrastructure, total blackout)', 'Very rare'], + ]} + /> +

+ "Warning" is the sweet spot for most rules. You get alerted when something actually needs your attention without being overwhelmed by every minor event. +

+ + Quiet Hours +

+ When enabled, non-emergency notifications are held during sleeping hours (default 10pm-6am). Emergency alerts and rules marked "Override Quiet Hours" always get through. +

+

You can turn quiet hours off entirely if you don't want them.

+ + Webhook — The Swiss Army Knife +

+ A webhook sends your alert as an HTTP POST to any URL. This one delivery method works with: +

+
    +
  • Discord — use a Discord webhook URL
  • +
  • Slack — use a Slack incoming webhook URL
  • +
  • ntfy.sh — POST to https://ntfy.sh/your-topic
  • +
  • Pushover — POST to the Pushover API
  • +
  • Home Assistant — POST to an automation webhook URL
  • +
  • Anything else that accepts HTTP POST
  • +
+

+ MeshAI doesn't need to know what's on the other end. Give it the URL and it works. +

+
+ + {/* Commands */} + +

+ All commands use the ! prefix (configurable). Send these as a direct message to MeshAI on your mesh. +

+ + Basic Commands + !help, 'Shows all available commands'], + [!ping, 'Tests if the bot is alive'], + [!status, 'Quick mesh summary (nodes online, health score)'], + [!health, 'Detailed health report with pillar scores'], + [!weather, 'Current weather for your area'], + ]} + /> + + Environmental Commands + !alerts, 'Active NWS weather alerts for your area'], + [<>!solar (or !hf), 'Current solar indices and RF conditions'], + [!fire, 'Active wildfires near your mesh'], + [!avy, 'Avalanche advisory (seasonal — shows "off season" in summer)'], + [<>!streams (or !gauges), 'Stream gauge readings'], + [<>!roads (or !traffic), 'Road conditions and traffic flow'], + [!hotspots, 'Satellite fire detections'], + ]} + /> + + Subscription Commands + !subscribe, 'Lists all alert categories you can subscribe to'], + [!subscribe fire_proximity, 'Subscribe to a specific category'], + [!subscribe all, 'Subscribe to everything'], + [!unsubscribe fire_proximity, 'Unsubscribe from a category'], + [!subscriptions, "Shows what you're currently subscribed to"], + ]} + /> + + Conversational +

+ MeshAI isn't just commands — you can ask it questions in plain English. "How's the mesh doing?" "Is there any ducting?" "What's the fire situation?" "How's traffic on I-84?" It uses the live environmental data and mesh health data to answer. +

+
+ + {/* API Reference */} + +

+ MeshAI's REST API is available at http://your-host:8080. All endpoints return JSON. +

+ + System +
    +
  • GET /api/status — version, uptime, node count
  • +
  • GET /api/channels — radio channel list
  • +
  • POST /api/restart — restart the bot
  • +
+ + Mesh Data +
    +
  • GET /api/health — health score and pillars
  • +
  • GET /api/nodes — all nodes with positions and telemetry
  • +
  • GET /api/edges — neighbor links with signal quality
  • +
  • GET /api/regions — region summaries
  • +
  • GET /api/sources — data source health
  • +
+ + Configuration +
    +
  • GET /api/config — full config
  • +
  • GET /api/config/{'{section}'} — one section
  • +
  • PUT /api/config/{'{section}'} — update a section
  • +
+ + Environmental +
    +
  • GET /api/env/status — per-feed health
  • +
  • GET /api/env/active — all active events
  • +
  • GET /api/env/swpc — solar/geomagnetic data
  • +
  • GET /api/env/ducting — atmospheric profile
  • +
  • GET /api/env/fires — wildfire perimeters
  • +
  • GET /api/env/hotspots — satellite fire detections
  • +
+ + Alerts +
    +
  • GET /api/alerts/active — current alerts
  • +
  • GET /api/alerts/history — past alerts
  • +
  • GET /api/notifications/categories — available alert categories
  • +
+ + Real-time +
    +
  • ws://your-host:8080/ws/live — WebSocket for live updates
  • +
+
+ +
+
+
+ ) +} diff --git a/meshai/commands/roads_cmd.py b/meshai/commands/roads_cmd.py new file mode 100644 index 0000000..2d3b162 --- /dev/null +++ b/meshai/commands/roads_cmd.py @@ -0,0 +1,74 @@ +"""Road conditions command.""" + +from .base import CommandContext, CommandHandler + + +class RoadsCommand(CommandHandler): + """Show traffic flow and road conditions.""" + + aliases = ["traffic", "highways"] + + def __init__(self, env_store): + self._env_store = env_store + self._name = "roads" + + @property + def name(self) -> str: + return self._name + + @name.setter + def name(self, value: str): + self._name = value + + @property + def description(self) -> str: + return "Show traffic flow and road conditions" + + @property + def usage(self) -> str: + return "!roads" + + async def execute(self, args: str, context: CommandContext) -> str: + if not self._env_store: + return "Environmental feeds not configured." + + traffic_events = self._env_store.get_active(source="traffic") + road_events = self._env_store.get_active(source="511") + + if not traffic_events and not road_events: + return "No traffic or road data available. Check if sources are configured." + + lines = [] + + # Traffic flow from TomTom + if traffic_events: + lines.append("Traffic Flow:") + for event in traffic_events: + props = event.get("properties", {}) + corridor = props.get("corridor", "Unknown") + current = props.get("currentSpeed", 0) + free_flow = props.get("freeFlowSpeed", 0) + ratio = props.get("speedRatio", 1.0) + closure = props.get("roadClosure", False) + + if closure: + lines.append(f" {corridor}: CLOSED") + else: + pct = int(ratio * 100) + lines.append(f" {corridor}: {int(current)}mph ({pct}% of {int(free_flow)}mph)") + + # 511 road events + if road_events: + if traffic_events: + lines.append("") # Separator + lines.append("Road Events:") + for event in road_events: + event_type = event.get("event_type", "Event") + headline = event.get("headline", "")[:80] + props = event.get("properties", {}) + is_closure = props.get("is_closure", False) + + icon = "X" if is_closure else "-" + lines.append(f" {icon} {headline}") + + return "\n".join(lines) if lines else "No road conditions data." diff --git a/meshai/commands/streams_cmd.py b/meshai/commands/streams_cmd.py new file mode 100644 index 0000000..22b5d55 --- /dev/null +++ b/meshai/commands/streams_cmd.py @@ -0,0 +1,73 @@ +"""Stream gauge command.""" + +from .base import CommandContext, CommandHandler + + +class StreamsCommand(CommandHandler): + """Show current stream gauge readings.""" + + aliases = ["gauges", "rivers"] + + def __init__(self, env_store): + self._env_store = env_store + self._name = "streams" + + @property + def name(self) -> str: + return self._name + + @name.setter + def name(self, value: str): + self._name = value + + @property + def description(self) -> str: + return "Show stream gauge readings" + + @property + def usage(self) -> str: + return "!streams" + + async def execute(self, args: str, context: CommandContext) -> str: + if not self._env_store: + return "Environmental feeds not configured." + + events = self._env_store.get_active(source="usgs") + + if not events: + return "No stream gauge data available. Check if USGS sites are configured." + + lines = [] + + # Group by site + sites = {} + for event in events: + props = event.get("properties", {}) + site_id = props.get("site_id", "") + site_name = props.get("site_name", "Unknown") + + if site_id not in sites: + sites[site_id] = {"name": site_name, "readings": []} + + param = props.get("parameter", "") + value = props.get("value", 0) + unit = props.get("unit", "") + + sites[site_id]["readings"].append((param, value, unit)) + + for site_id, data in sites.items(): + name = data["name"] + readings = data["readings"] + + # Format readings + parts = [] + for param, value, unit in readings: + if "flow" in param.lower() or unit == "ft3/s": + parts.append(f"{value:,.0f} {unit}") + else: + parts.append(f"{value:.1f} {unit}") + + reading_str = ", ".join(parts) + lines.append(f"{name}: {reading_str}") + + return "\n".join(lines) if lines else "No stream gauge readings." diff --git a/meshai/commands/subscribe.py b/meshai/commands/subscribe.py index 5dbe705..36db916 100644 --- a/meshai/commands/subscribe.py +++ b/meshai/commands/subscribe.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from ..mesh_data_store import MeshDataStore from ..mesh_reporter import MeshReporter from ..subscriptions import SubscriptionManager + from ..notifications.router import NotificationRouter class SubCommand(CommandHandler): @@ -15,7 +16,7 @@ class SubCommand(CommandHandler): name = "sub" description = "Subscribe to reports or alerts" - usage = "!sub daily|weekly|alerts [time] [day] [scope]" + usage = "!sub daily|weekly|alerts| [time] [day] [scope]" aliases = ["subscribe"] def __init__( @@ -23,23 +24,35 @@ class SubCommand(CommandHandler): subscription_manager: "SubscriptionManager" = None, mesh_reporter: "MeshReporter" = None, data_store: "MeshDataStore" = None, + notification_router: "NotificationRouter" = None, ): self._sub_manager = subscription_manager self._reporter = mesh_reporter self._data_store = data_store + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """Handle subscription command.""" - if not self._sub_manager: - return "Subscriptions not available." - parts = args.strip().split() + + # No args - show available alert categories if not parts: - return self._usage_help() + return self._show_categories() sub_type = parts[0].lower() + + # Check if it's a category subscription + if self._notification_router: + from ..notifications.categories import ALERT_CATEGORIES + if sub_type in ALERT_CATEGORIES or sub_type == "all": + return self._handle_category_subscription(sub_type, context) + + # Legacy subscription types if sub_type not in ("daily", "weekly", "alerts"): - return f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts" + return self._show_categories() + + if not self._sub_manager: + return "Subscriptions not available." try: if sub_type == "daily": @@ -51,15 +64,55 @@ class SubCommand(CommandHandler): except ValueError as e: return f"Error: {e}" + def _show_categories(self) -> str: + """Show available alert categories.""" + try: + from ..notifications.categories import ALERT_CATEGORIES + except ImportError: + return self._usage_help() + + lines = ["Available alert categories:"] + for cat_id, cat_info in ALERT_CATEGORIES.items(): + lines.append(f" {cat_id} - {cat_info['description']}") + lines.append("") + lines.append("Usage:") + lines.append(" !sub - subscribe to a category") + lines.append(" !sub all - subscribe to all alerts") + lines.append(" !sub alerts - legacy mesh-wide alerts") + + return "\n".join(lines) + + def _handle_category_subscription(self, category: str, context: CommandContext) -> str: + """Handle category-based alert subscription.""" + node_id = self._get_user_id(context) + + if category == "all": + categories = [] # Empty = all categories + else: + categories = [category] + + # Add subscription via notification router + rule_name = self._notification_router.add_mesh_subscription( + node_id=node_id, + categories=categories, + ) + + if category == "all": + return "Subscribed to all alert categories. Use !unsub to remove." + else: + from ..notifications.categories import get_category + cat_info = get_category(category) + return f"Subscribed to {cat_info['name']} alerts. Use !unsub {category} to remove." + def _usage_help(self) -> str: """Return usage help.""" return """Usage: !sub daily 1830 - daily mesh report at 6:30 PM !sub daily 1830 region SCID - daily region report -!sub daily 1830 node MHR - daily node report !sub weekly 0800 sun - weekly digest Sunday 8 AM -!sub alerts - mesh-wide alerts -!sub alerts region SCID - alerts for a region""" +!sub alerts - mesh-wide alerts (legacy) +!sub - subscribe to alert category +!sub all - subscribe to all alerts""" def _handle_daily(self, args: list, context: CommandContext) -> str: """Handle daily subscription.""" @@ -68,11 +121,9 @@ class SubCommand(CommandHandler): schedule_time = args[0] scope_type, scope_value = self._parse_scope(args[1:]) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="daily", schedule_time=schedule_time, @@ -92,11 +143,9 @@ class SubCommand(CommandHandler): schedule_time = args[0] schedule_day = args[1].lower() scope_type, scope_value = self._parse_scope(args[2:]) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="weekly", schedule_time=schedule_time, @@ -111,13 +160,11 @@ class SubCommand(CommandHandler): return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}" def _handle_alerts(self, args: list, context: CommandContext) -> str: - """Handle alerts subscription.""" + """Handle alerts subscription (legacy).""" scope_type, scope_value = self._parse_scope(args) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="alerts", scope_type=scope_type, @@ -128,15 +175,10 @@ class SubCommand(CommandHandler): return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}" def _parse_scope(self, args: list) -> tuple[str, str]: - """Parse scope from remaining args. - - Returns: - (scope_type, scope_value) tuple - """ + """Parse scope from remaining args.""" if not args: return "mesh", None - # Look for 'region' or 'node' keyword scope_type = "mesh" scope_value = None @@ -144,26 +186,17 @@ class SubCommand(CommandHandler): arg_lower = arg.lower() if arg_lower == "region": scope_type = "region" - # Everything after 'region' is the region name scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None break elif arg_lower == "node": scope_type = "node" - # Next arg is the node identifier scope_value = args[i + 1] if i + 1 < len(args) else None break return scope_type, scope_value def _validate_scope(self, scope_type: str, scope_value: str) -> str: - """Validate and resolve scope value. - - Returns: - Resolved scope_value (e.g., full region name) - - Raises: - ValueError: If scope not found - """ + """Validate and resolve scope value.""" if scope_type == "mesh": return None @@ -172,14 +205,9 @@ class SubCommand(CommandHandler): if scope_type == "region" and self._reporter: region = self._reporter._find_region(scope_value) - if not region: - # List available regions - health = self._reporter.health_engine.mesh_health - if health: - available = [r.name for r in health.regions if r.node_ids] - return scope_value # Use as-is, will fail at delivery if invalid - raise ValueError(f"Region '{scope_value}' not found") - return region.name # Return canonical name + if region: + return region.name + return scope_value if scope_type == "node" and self._reporter: node = self._reporter._find_node(scope_value) @@ -191,7 +219,6 @@ class SubCommand(CommandHandler): def _get_user_id(self, context: CommandContext) -> str: """Extract user ID from context.""" - # sender_id is like "!abcd1234" - convert to node_num sender_id = context.sender_id if sender_id.startswith("!"): return str(int(sender_id[1:], 16)) @@ -217,26 +244,40 @@ class UnsubCommand(CommandHandler): name = "unsub" description = "Remove subscription(s)" - usage = "!unsub daily|weekly|alerts|all" + usage = "!unsub daily|weekly|alerts||all" aliases = ["unsubscribe"] - def __init__(self, subscription_manager: "SubscriptionManager" = None): + def __init__( + self, + subscription_manager: "SubscriptionManager" = None, + notification_router: "NotificationRouter" = None, + ): self._sub_manager = subscription_manager + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """Handle unsubscribe command.""" - if not self._sub_manager: - return "Subscriptions not available." - sub_type = args.strip().lower() if args else None if not sub_type: - return "Usage: !unsub daily|weekly|alerts|all" - - if sub_type not in ("daily", "weekly", "alerts", "all"): - return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, or all" + return "Usage: !unsub daily|weekly|alerts||all" user_id = self._get_user_id(context) + + # Check if it's a category unsubscription + if self._notification_router: + from ..notifications.categories import ALERT_CATEGORIES + if sub_type in ALERT_CATEGORIES or sub_type == "all": + self._notification_router.remove_mesh_subscription(user_id) + return "Removed alert subscriptions" + + # Legacy subscription types + if not self._sub_manager: + return "Subscriptions not available." + + if sub_type not in ("daily", "weekly", "alerts", "all"): + return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, , or all" + removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None) if removed == 0: @@ -260,26 +301,44 @@ class MySubsCommand(CommandHandler): name = "mysubs" description = "List your subscriptions" usage = "!mysubs" - aliases = ["subs"] + aliases = ["subs", "subscriptions"] - def __init__(self, subscription_manager: "SubscriptionManager" = None): + def __init__( + self, + subscription_manager: "SubscriptionManager" = None, + notification_router: "NotificationRouter" = None, + ): self._sub_manager = subscription_manager + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """List user's subscriptions.""" - if not self._sub_manager: - return "Subscriptions not available." - user_id = self._get_user_id(context) - subs = self._sub_manager.get_user_subs(user_id) + lines = [] - if not subs: + # Check notification router subscriptions + if self._notification_router: + categories = self._notification_router.get_node_subscriptions(user_id) + if categories: + if categories == ["all"]: + lines.append("Alert subscriptions: all categories") + else: + lines.append(f"Alert subscriptions: {', '.join(categories)}") + + # Check legacy subscriptions + if self._sub_manager: + subs = self._sub_manager.get_user_subs(user_id) + if subs: + if not lines: + lines.append("Your subscriptions:") + else: + lines.append("\nScheduled reports:") + for i, sub in enumerate(subs, 1): + lines.append(f" {i}. {self._format_sub(sub)}") + + if not lines: return "No active subscriptions. Use !sub to subscribe." - lines = ["Your subscriptions:"] - for i, sub in enumerate(subs, 1): - lines.append(f" {i}. {self._format_sub(sub)}") - return "\n".join(lines) def _format_sub(self, sub: dict) -> str: @@ -301,7 +360,7 @@ class MySubsCommand(CommandHandler): time_str = self._format_time(sub.get("schedule_time", "0000")) day_str = (sub.get("schedule_day") or "").capitalize() return f"Weekly {scope_desc}report at {time_str} {day_str}" - else: # alerts + else: return f"Alerts for {scope_desc.strip() or 'mesh'}" def _format_time(self, hhmm: str) -> str: diff --git a/meshai/dashboard/api/notification_routes.py b/meshai/dashboard/api/notification_routes.py new file mode 100644 index 0000000..de30178 --- /dev/null +++ b/meshai/dashboard/api/notification_routes.py @@ -0,0 +1,113 @@ +"""Notification API routes.""" + +from fastapi import APIRouter, Request, HTTPException +from pydantic import BaseModel +from typing import Optional + +router = APIRouter(prefix="/notifications", tags=["notifications"]) + + +class ChannelCreate(BaseModel): + """Channel creation request.""" + id: str + type: str + enabled: bool = True + channel_index: int = 0 + node_ids: list[str] = [] + smtp_host: str = "" + smtp_port: int = 587 + smtp_user: str = "" + smtp_password: str = "" + smtp_tls: bool = True + from_address: str = "" + recipients: list[str] = [] + url: str = "" + headers: dict = {} + + +class RuleCreate(BaseModel): + """Rule creation request.""" + name: str + categories: list[str] = [] + min_severity: str = "warning" + channel_ids: list[str] = [] + override_quiet: bool = False + + +class QuietHoursUpdate(BaseModel): + """Quiet hours update request.""" + start: str + end: str + + +@router.get("/categories") +async def get_categories(): + """Get all alert categories with descriptions.""" + try: + from ...notifications.categories import list_categories + return list_categories() + except ImportError: + return [] + + +@router.get("/channels") +async def get_channels(request: Request): + """Get configured notification channels.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + return [] + return notification_router.get_channels() + + +@router.post("/channels") +async def create_channel(request: Request, channel: ChannelCreate): + """Create a new notification channel.""" + # This would require runtime config modification + # For now, return not implemented + raise HTTPException(status_code=501, detail="Channel creation requires config file edit") + + +@router.post("/channels/{channel_id}/test") +async def test_channel(request: Request, channel_id: str): + """Send a test alert to a channel.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + raise HTTPException(status_code=404, detail="Notification router not configured") + + success, message = await notification_router.test_channel(channel_id) + return {"success": success, "message": message} + + +@router.get("/rules") +async def get_rules(request: Request): + """Get configured notification rules.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + return [] + return notification_router.get_rules() + + +@router.post("/rules") +async def create_rule(request: Request, rule: RuleCreate): + """Create a new notification rule.""" + # This would require runtime config modification + raise HTTPException(status_code=501, detail="Rule creation requires config file edit") + + +@router.get("/quiet-hours") +async def get_quiet_hours(request: Request): + """Get quiet hours configuration.""" + config = getattr(request.app.state, "config", None) + if not config or not hasattr(config, "notifications"): + return {"start": "22:00", "end": "06:00"} + return { + "start": config.notifications.quiet_hours_start, + "end": config.notifications.quiet_hours_end, + } + + +@router.put("/quiet-hours") +async def update_quiet_hours(request: Request, quiet_hours: QuietHoursUpdate): + """Update quiet hours configuration.""" + # This would require runtime config modification + raise HTTPException(status_code=501, detail="Quiet hours update requires config file edit") diff --git a/meshai/notifications/__init__.py b/meshai/notifications/__init__.py new file mode 100644 index 0000000..e0e5993 --- /dev/null +++ b/meshai/notifications/__init__.py @@ -0,0 +1,6 @@ +"""Notification system for MeshAI alerts.""" + +from .categories import ALERT_CATEGORIES, get_category, list_categories +from .router import NotificationRouter + +__all__ = ["ALERT_CATEGORIES", "get_category", "list_categories", "NotificationRouter"] diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py new file mode 100644 index 0000000..e6762db --- /dev/null +++ b/meshai/notifications/categories.py @@ -0,0 +1,157 @@ +"""Alert category registry. + +Defines all alertable conditions with human-readable names and descriptions. +""" + +ALERT_CATEGORIES = { + # Infrastructure alerts + "infra_offline": { + "name": "Infrastructure Offline", + "description": "An infrastructure node stopped responding", + "default_severity": "warning", + }, + "critical_node_down": { + "name": "Critical Node Down", + "description": "A node marked as critical went offline", + "default_severity": "critical", + }, + "infra_recovery": { + "name": "Infrastructure Recovery", + "description": "An infrastructure node came back online", + "default_severity": "info", + }, + "new_router": { + "name": "New Router", + "description": "A new router appeared on the mesh", + "default_severity": "info", + }, + + # Power alerts + "battery_warning": { + "name": "Battery Warning", + "description": "Infrastructure node battery below warning threshold", + "default_severity": "warning", + }, + "battery_critical": { + "name": "Battery Critical", + "description": "Infrastructure node battery below critical threshold", + "default_severity": "critical", + }, + "battery_emergency": { + "name": "Battery Emergency", + "description": "Infrastructure node battery critically low", + "default_severity": "emergency", + }, + "battery_trend": { + "name": "Battery Declining", + "description": "Battery showing declining trend over 7 days", + "default_severity": "warning", + }, + "power_source_change": { + "name": "Power Source Change", + "description": "Node switched from USB to battery (possible outage)", + "default_severity": "warning", + }, + "solar_not_charging": { + "name": "Solar Not Charging", + "description": "Solar panel not charging during daylight hours", + "default_severity": "warning", + }, + + # Utilization alerts + "sustained_high_util": { + "name": "High Utilization", + "description": "Channel utilization elevated for extended period", + "default_severity": "warning", + }, + "packet_flood": { + "name": "Packet Flood", + "description": "Node sending excessive packets", + "default_severity": "warning", + }, + + # Coverage alerts + "infra_single_gateway": { + "name": "Single Gateway", + "description": "Infrastructure node dropped to single gateway coverage", + "default_severity": "warning", + }, + "feeder_offline": { + "name": "Feeder Offline", + "description": "A feeder gateway stopped responding", + "default_severity": "warning", + }, + "region_total_blackout": { + "name": "Region Blackout", + "description": "All infrastructure in a region is offline", + "default_severity": "emergency", + }, + + # Health score alerts + "mesh_score_low": { + "name": "Mesh Health Low", + "description": "Overall mesh health score below threshold", + "default_severity": "warning", + }, + "region_score_low": { + "name": "Region Health Low", + "description": "A region's health score below threshold", + "default_severity": "warning", + }, + + # Environmental alerts + "weather_warning": { + "name": "Severe Weather", + "description": "NWS warning or advisory for mesh area", + "default_severity": "warning", + }, + "hf_blackout": { + "name": "HF Radio Blackout", + "description": "R3+ solar event degrading HF propagation", + "default_severity": "warning", + }, + "tropospheric_ducting": { + "name": "Tropospheric Ducting", + "description": "Atmospheric conditions extending VHF/UHF range", + "default_severity": "info", + }, + "wildfire_proximity": { + "name": "Fire Near Mesh", + "description": "Wildfire detected within configured distance", + "default_severity": "warning", + }, + "new_ignition": { + "name": "New Fire Ignition", + "description": "Satellite hotspot not matching any known fire", + "default_severity": "warning", + }, + "flood_warning": { + "name": "Flood Warning", + "description": "Stream gauge exceeds flood threshold", + "default_severity": "warning", + }, + "road_closure": { + "name": "Road Closure", + "description": "Full road closure on monitored corridor", + "default_severity": "warning", + }, +} + + +def get_category(category_id: str) -> dict: + """Get category info by ID, with fallback for unknown categories.""" + if category_id in ALERT_CATEGORIES: + return ALERT_CATEGORIES[category_id] + return { + "name": category_id.replace("_", " ").title(), + "description": f"Alert type: {category_id}", + "default_severity": "info", + } + + +def list_categories() -> list[dict]: + """List all categories with their IDs.""" + return [ + {"id": cat_id, **cat_info} + for cat_id, cat_info in ALERT_CATEGORIES.items() + ] diff --git a/meshai/notifications/channels.py b/meshai/notifications/channels.py new file mode 100644 index 0000000..5042213 --- /dev/null +++ b/meshai/notifications/channels.py @@ -0,0 +1,308 @@ +"""Notification channel implementations.""" + +import asyncio +import logging +import smtplib +import ssl +import time +from abc import ABC, abstractmethod +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from typing import Optional, TYPE_CHECKING + +import httpx + +if TYPE_CHECKING: + from ..connector import MeshConnector + +logger = logging.getLogger(__name__) + + +class NotificationChannel(ABC): + """Base class for notification delivery channels.""" + + channel_type: str = "base" + + @abstractmethod + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert. Returns True on success.""" + raise NotImplementedError + + @abstractmethod + async def test(self) -> tuple[bool, str]: + """Send test message. Returns (success, message).""" + raise NotImplementedError + + +class MeshBroadcastChannel(NotificationChannel): + """Post alert to mesh channel.""" + + channel_type = "mesh_broadcast" + + def __init__(self, connector: "MeshConnector", channel_index: int = 0): + self._connector = connector + self._channel = channel_index + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert to mesh channel.""" + if not self._connector: + logger.warning("No mesh connector available") + return False + + try: + message = alert.get("message", "") + self._connector.send_message( + text=message, + destination=None, + channel=self._channel, + ) + logger.info("Broadcast alert to channel %d", self._channel) + return True + except Exception as e: + logger.error("Failed to broadcast alert: %s", e) + return False + + async def test(self) -> tuple[bool, str]: + """Send test broadcast.""" + try: + self._connector.send_message( + text="[TEST] MeshAI notification system test", + destination=None, + channel=self._channel, + ) + return True, "Test message sent to channel %d" % self._channel + except Exception as e: + return False, "Failed to send test: %s" % e + + +class MeshDMChannel(NotificationChannel): + """DM alert to specific node IDs.""" + + channel_type = "mesh_dm" + + def __init__(self, connector: "MeshConnector", node_ids: list[str]): + self._connector = connector + self._node_ids = node_ids + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert via DM to configured nodes.""" + if not self._connector: + return False + + message = alert.get("message", "") + success = True + + for node_id in self._node_ids: + try: + dest = int(node_id) if node_id.isdigit() else node_id + self._connector.send_message(text=message, destination=dest, channel=0) + except Exception as e: + logger.error("Failed to DM %s: %s", node_id, e) + success = False + + return success + + async def test(self) -> tuple[bool, str]: + """Send test DM to all configured nodes.""" + if not self._node_ids: + return False, "No node IDs configured" + try: + for node_id in self._node_ids: + dest = int(node_id) if node_id.isdigit() else node_id + self._connector.send_message( + text="[TEST] MeshAI notification test", + destination=dest, + channel=0, + ) + return True, "Test DMs sent to %d nodes" % len(self._node_ids) + except Exception as e: + return False, "Failed to send test DMs: %s" % e + + +class EmailChannel(NotificationChannel): + """Send alert via SMTP email.""" + + channel_type = "email" + + def __init__( + self, + smtp_host: str, + smtp_port: int, + smtp_user: str, + smtp_password: str, + smtp_tls: bool, + from_address: str, + recipients: list[str], + ): + self._host = smtp_host + self._port = smtp_port + self._user = smtp_user + self._password = smtp_password + self._tls = smtp_tls + self._from = from_address + self._recipients = recipients + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert via email.""" + if not self._recipients: + return False + + alert_type = alert.get("type", "alert") + severity = alert.get("severity", "info").upper() + message = alert.get("message", "") + subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title()) + body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % ( + alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message + ) + + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._send_email, subject, body) + return True + except Exception as e: + logger.error("Failed to send email: %s", e) + return False + + def _send_email(self, subject: str, body: str): + msg = MIMEMultipart() + msg["From"] = self._from + msg["To"] = ", ".join(self._recipients) + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + + if self._tls: + context = ssl.create_default_context() + with smtplib.SMTP(self._host, self._port) as server: + server.starttls(context=context) + if self._user and self._password: + server.login(self._user, self._password) + server.sendmail(self._from, self._recipients, msg.as_string()) + else: + with smtplib.SMTP(self._host, self._port) as server: + if self._user and self._password: + server.login(self._user, self._password) + server.sendmail(self._from, self._recipients, msg.as_string()) + + async def test(self) -> tuple[bool, str]: + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + self._send_email, + "[MeshAI TEST] Notification Test", + "Test message from MeshAI.", + ) + return True, "Test email sent to %d recipients" % len(self._recipients) + except Exception as e: + return False, "Failed to send test email: %s" % e + + +class WebhookChannel(NotificationChannel): + """POST alert JSON to a URL.""" + + channel_type = "webhook" + + def __init__(self, url: str, headers: Optional[dict] = None): + self._url = url + self._headers = headers or {} + + async def deliver(self, alert: dict, rule: dict) -> bool: + """POST alert to webhook URL.""" + payload = { + "type": alert.get("type"), + "severity": alert.get("severity", "info"), + "message": alert.get("message", ""), + "timestamp": time.time(), + "node_name": alert.get("node_name"), + "region": alert.get("region"), + } + + # Discord/Slack format + if "discord.com" in self._url or "slack.com" in self._url: + severity = alert.get("severity", "info") + color = { + "emergency": 0xFF0000, + "critical": 0xFF4444, + "warning": 0xFFAA00, + "info": 0x0099FF, + }.get(severity, 0x888888) + payload = { + "embeds": [{ + "title": "MeshAI: %s" % alert.get("type", "unknown"), + "description": alert.get("message", ""), + "color": color, + }] + } + + # ntfy format + elif "ntfy" in self._url: + headers = { + **self._headers, + "Title": "MeshAI: %s" % alert.get("type", "alert"), + "Priority": "3", + } + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + self._url, + content=alert.get("message", ""), + headers=headers, + timeout=10, + ) + return resp.status_code < 400 + except Exception as e: + logger.error("Webhook failed: %s", e) + return False + + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + self._url, + json=payload, + headers={"Content-Type": "application/json", **self._headers}, + timeout=10, + ) + return resp.status_code < 400 + except Exception as e: + logger.error("Webhook failed: %s", e) + return False + + async def test(self) -> tuple[bool, str]: + test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"} + success = await self.deliver(test_alert, {}) + if success: + return True, "Test sent to %s" % self._url + return False, "Webhook failed" + + +def create_channel(config: dict, connector=None) -> NotificationChannel: + """Create a channel instance from config.""" + channel_type = config.get("type", "") + + if channel_type == "mesh_broadcast": + return MeshBroadcastChannel( + connector=connector, + channel_index=config.get("channel_index", 0), + ) + elif channel_type == "mesh_dm": + return MeshDMChannel( + connector=connector, + node_ids=config.get("node_ids", []), + ) + elif channel_type == "email": + return EmailChannel( + smtp_host=config.get("smtp_host", ""), + smtp_port=config.get("smtp_port", 587), + smtp_user=config.get("smtp_user", ""), + smtp_password=config.get("smtp_password", ""), + smtp_tls=config.get("smtp_tls", True), + from_address=config.get("from_address", ""), + recipients=config.get("recipients", []), + ) + elif channel_type == "webhook": + return WebhookChannel( + url=config.get("url", ""), + headers=config.get("headers", {}), + ) + else: + raise ValueError("Unknown channel type: %s" % channel_type) diff --git a/meshai/notifications/router.py b/meshai/notifications/router.py new file mode 100644 index 0000000..8c0605d --- /dev/null +++ b/meshai/notifications/router.py @@ -0,0 +1,266 @@ +"""Notification router - matches alerts to rules and delivers via channels.""" + +import logging +import time +from datetime import datetime +from typing import Optional, TYPE_CHECKING + +from .channels import create_channel, NotificationChannel +from .summarizer import MessageSummarizer + +if TYPE_CHECKING: + from ..connector import MeshConnector + +logger = logging.getLogger(__name__) + +# Severity levels in order +SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"] + + +class NotificationRouter: + """Routes alerts through matching rules to notification channels.""" + + def __init__( + self, + config, + connector: Optional["MeshConnector"] = None, + llm_backend=None, + timezone: str = "America/Boise", + ): + self._channels: dict[str, NotificationChannel] = {} + self._rules: list[dict] = [] + self._quiet_start = getattr(config, "quiet_hours_start", "22:00") + self._quiet_end = getattr(config, "quiet_hours_end", "06:00") + self._timezone = timezone + self._dedup_window = getattr(config, "dedup_seconds", 600) + self._recent: dict[tuple, float] = {} # (category, event_key) -> last_sent_time + self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None + self._connector = connector + + # Create channel instances from config + channels_config = getattr(config, "channels", []) + for ch_config in channels_config: + if hasattr(ch_config, "__dict__"): + ch_dict = {k: v for k, v in ch_config.__dict__.items() if not k.startswith("_")} + else: + ch_dict = ch_config + + if not ch_dict.get("enabled", True): + continue + + channel_id = ch_dict.get("id", "") + if not channel_id: + continue + + try: + channel = create_channel(ch_dict, connector) + self._channels[channel_id] = channel + logger.debug("Created notification channel: %s (%s)", channel_id, ch_dict.get("type")) + except Exception as e: + logger.warning("Failed to create channel %s: %s", channel_id, e) + + # Load rules + rules_config = getattr(config, "rules", []) + for rule in rules_config: + if hasattr(rule, "__dict__"): + rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")} + else: + rule_dict = rule + self._rules.append(rule_dict) + + logger.info( + "Notification router initialized: %d channels, %d rules", + len(self._channels), + len(self._rules), + ) + + async def process_alert(self, alert: dict) -> bool: + """Route an alert through matching rules to channels. + + Returns True if alert was delivered to at least one channel. + """ + category = alert.get("type", "") + severity = alert.get("severity", "info") + delivered = False + + for rule in self._rules: + # Check category match + rule_categories = rule.get("categories", []) + if rule_categories and category not in rule_categories: + continue + + # Check severity threshold + min_severity = rule.get("min_severity", "info") + if not self._severity_meets(severity, min_severity): + continue + + # Check quiet hours (emergencies and criticals override) + if self._in_quiet_hours() and severity not in ("emergency", "critical"): + if not rule.get("override_quiet", False): + continue + + # Check dedup + event_id = alert.get("event_id", alert.get("message", "")[:50]) + dedup_key = (category, event_id) + now = time.time() + if dedup_key in self._recent: + if now - self._recent[dedup_key] < self._dedup_window: + logger.debug("Skipping duplicate alert: %s", category) + continue + self._recent[dedup_key] = now + + # Deliver to each channel in the rule + channel_ids = rule.get("channel_ids", []) + for channel_id in channel_ids: + channel = self._channels.get(channel_id) + if not channel: + continue + + try: + # Summarize for mesh channels if over 200 chars + delivery_alert = alert + message = alert.get("message", "") + if channel.channel_type in ("mesh_broadcast", "mesh_dm"): + if len(message) > 200: + if self._summarizer: + summary = await self._summarizer.summarize(message, max_chars=195) + delivery_alert = {**alert, "message": summary} + else: + delivery_alert = {**alert, "message": message[:195] + "..."} + + success = await channel.deliver(delivery_alert, rule) + if success: + delivered = True + logger.info( + "Alert delivered via %s: %s", + channel_id, + category, + ) + except Exception as e: + logger.warning("Channel %s delivery failed: %s", channel_id, e) + + return delivered + + def _severity_meets(self, actual: str, required: str) -> bool: + """Check if actual severity meets or exceeds required severity.""" + try: + actual_idx = SEVERITY_ORDER.index(actual.lower()) + required_idx = SEVERITY_ORDER.index(required.lower()) + return actual_idx >= required_idx + except ValueError: + return True # Unknown severity, allow through + + def _in_quiet_hours(self) -> bool: + """Check if current time is within quiet hours.""" + try: + from zoneinfo import ZoneInfo + tz = ZoneInfo(self._timezone) + now = datetime.now(tz) + current_time = now.strftime("%H:%M") + + start = self._quiet_start + end = self._quiet_end + + if start <= end: + # Simple range (e.g., 01:00 to 06:00) + return start <= current_time <= end + else: + # Crosses midnight (e.g., 22:00 to 06:00) + return current_time >= start or current_time <= end + except Exception: + return False + + def get_channels(self) -> list[dict]: + """Get list of configured channels.""" + return [ + {"id": ch_id, "type": ch.channel_type} + for ch_id, ch in self._channels.items() + ] + + def get_rules(self) -> list[dict]: + """Get list of configured rules.""" + return self._rules + + async def test_channel(self, channel_id: str) -> tuple[bool, str]: + """Send a test alert to a specific channel.""" + channel = self._channels.get(channel_id) + if not channel: + return False, "Channel not found: %s" % channel_id + return await channel.test() + + def add_mesh_subscription( + self, + node_id: str, + categories: list[str], + rule_name: Optional[str] = None, + ) -> str: + """Add a mesh DM subscription for a node. + + Creates a channel and rule for the node to receive alerts. + Returns the rule name. + """ + # Create channel ID + channel_id = "mesh_dm_%s" % node_id + + # Create channel if it doesn't exist + if channel_id not in self._channels: + from .channels import MeshDMChannel + channel = MeshDMChannel( + connector=self._connector, + node_ids=[node_id], + ) + self._channels[channel_id] = channel + + # Create rule + if not rule_name: + rule_name = "sub_%s" % node_id + + # Check if rule already exists + for rule in self._rules: + if rule.get("name") == rule_name: + # Update existing rule + rule["categories"] = categories if categories else [] + rule["channel_ids"] = [channel_id] + return rule_name + + # Add new rule + self._rules.append({ + "name": rule_name, + "categories": categories if categories else [], # Empty = all + "min_severity": "warning", + "channel_ids": [channel_id], + "override_quiet": False, + }) + + return rule_name + + def remove_mesh_subscription(self, node_id: str) -> bool: + """Remove a mesh subscription for a node.""" + channel_id = "mesh_dm_%s" % node_id + rule_name = "sub_%s" % node_id + + # Remove channel + if channel_id in self._channels: + del self._channels[channel_id] + + # Remove rule + self._rules = [r for r in self._rules if r.get("name") != rule_name] + + return True + + def get_node_subscriptions(self, node_id: str) -> list[str]: + """Get categories a node is subscribed to.""" + rule_name = "sub_%s" % node_id + for rule in self._rules: + if rule.get("name") == rule_name: + categories = rule.get("categories", []) + return categories if categories else ["all"] + return [] + + def cleanup_recent(self, max_age: int = 3600): + """Clean up old entries from recent alerts cache.""" + now = time.time() + self._recent = { + k: v for k, v in self._recent.items() + if now - v < max_age + } diff --git a/meshai/notifications/summarizer.py b/meshai/notifications/summarizer.py new file mode 100644 index 0000000..cccca63 --- /dev/null +++ b/meshai/notifications/summarizer.py @@ -0,0 +1,64 @@ +"""Message summarizer for mesh delivery.""" + +import logging +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ..backends import LLMBackend + +logger = logging.getLogger(__name__) + + +class MessageSummarizer: + """Summarizes long messages for mesh delivery. + + Only used when: + - Delivering to mesh channels (broadcast or DM) + - Message exceeds max_chars (default 200) + - LLM backend is available + + Email and webhook channels receive full messages. + """ + + def __init__(self, llm_backend: Optional["LLMBackend"] = None): + self._llm = llm_backend + + async def summarize(self, message: str, max_chars: int = 195) -> str: + """Summarize a message to fit within max_chars. + + Args: + message: Original message text + max_chars: Maximum characters for summary + + Returns: + Summarized message, or truncated original if LLM unavailable + """ + if len(message) <= max_chars: + return message + + if not self._llm: + return message[:max_chars - 3] + "..." + + prompt = ( + "Summarize this alert in under %d characters. " + "Keep severity, location, and key facts. No preamble, just the summary:\n\n%s" + % (max_chars, message) + ) + + try: + # Use the LLM to generate a summary + response = await self._llm.generate( + prompt, + system_prompt="You are a concise alert summarizer. Output only the summary, no explanation.", + max_tokens=100, + ) + summary = response.strip() + + # Ensure it fits + if len(summary) <= max_chars: + return summary + return summary[:max_chars - 3] + "..." + + except Exception as e: + logger.debug("LLM summarization failed: %s", e) + return message[:max_chars - 3] + "..." diff --git a/meshai/sources/mqtt_source.py b/meshai/sources/mqtt_source.py new file mode 100644 index 0000000..1336802 --- /dev/null +++ b/meshai/sources/mqtt_source.py @@ -0,0 +1,435 @@ +"""MQTT source adapter for Meshtastic broker subscriptions. + +Push-based source that subscribes to MQTT topics and decodes +ServiceEnvelope-wrapped MeshPackets. Provides live node/packet +data without polling. +""" + +import asyncio +import logging +import os +import time +from dataclasses import dataclass, field +from typing import Optional + +logger = logging.getLogger(__name__) + +# Port number to name mapping (from portnums_pb2) +PORTNUM_NAMES = { + 0: "UNKNOWN_APP", + 1: "TEXT_MESSAGE_APP", + 2: "REMOTE_HARDWARE_APP", + 3: "POSITION_APP", + 4: "NODEINFO_APP", + 5: "ROUTING_APP", + 6: "ADMIN_APP", + 7: "TEXT_MESSAGE_COMPRESSED_APP", + 8: "WAYPOINT_APP", + 9: "AUDIO_APP", + 10: "DETECTION_SENSOR_APP", + 11: "ALERT_APP", + 32: "REPLY_APP", + 33: "IP_TUNNEL_APP", + 34: "PAXCOUNTER_APP", + 64: "SERIAL_APP", + 65: "STORE_FORWARD_APP", + 66: "RANGE_TEST_APP", + 67: "TELEMETRY_APP", + 68: "ZPS_APP", + 69: "SIMULATOR_APP", + 70: "TRACEROUTE_APP", + 71: "NEIGHBORINFO_APP", + 72: "ATAK_PLUGIN", + 73: "MAP_REPORT_APP", + 74: "POWERSTRESS_APP", + 256: "PRIVATE_APP", + 257: "ATAK_FORWARDER", +} + + +@dataclass +class MQTTNodeInfo: + """Cached node info from MQTT.""" + + node_num: int + node_id_hex: str = "" + short_name: str = "" + long_name: str = "" + hw_model: str = "" + role: int = 0 + latitude: Optional[float] = None + longitude: Optional[float] = None + altitude: Optional[float] = None + last_heard: float = 0.0 + battery_percent: Optional[float] = None + voltage: Optional[float] = None + channel_utilization: Optional[float] = None + air_util_tx: Optional[float] = None + snr: Optional[float] = None + rssi: Optional[int] = None + via_mqtt: bool = True + + +@dataclass +class MQTTPacketInfo: + """Packet received from MQTT.""" + + packet_id: int + from_node: int + to_node: int + portnum: int + portnum_name: str + channel: int + timestamp: float + snr: Optional[float] = None + rssi: Optional[int] = None + hop_limit: Optional[int] = None + hop_start: Optional[int] = None + payload_size: int = 0 + gateway_id: str = "" + + +class MQTTSource: + """MQTT source adapter subscribing to Meshtastic broker topics. + + Maintains a subscription loop that processes ServiceEnvelope messages + and updates node/packet caches. Unlike poll-based sources, this is + push-based and receives data as it arrives. + """ + + def __init__( + self, + host: str, + port: int = 1883, + username: str = "", + password: str = "", + topic_root: str = "msh/US", + use_tls: bool = False, + name: str = "mqtt", + ): + """Initialize MQTT source. + + Args: + host: MQTT broker hostname + port: MQTT broker port (1883 for plain, 8883 for TLS) + username: MQTT username (optional) + password: MQTT password (optional, supports ${ENV_VAR}) + topic_root: Topic root to subscribe to (default: msh/US) + use_tls: Enable TLS for connection + name: Source name for logging/attribution + """ + self._host = host + self._port = port + self._username = username + self._password = self._resolve_env(password) + self._topic_root = topic_root.rstrip("/") + self._use_tls = use_tls + self._name = name + + # State + self._nodes: dict[int, MQTTNodeInfo] = {} + self._packets: list[MQTTPacketInfo] = [] + self._max_packets = 1000 # Ring buffer + self._is_connected: bool = False + self._is_loaded: bool = False + self._last_message: float = 0.0 + self._last_error: str = "" + self._message_count: int = 0 + self._data_changed: bool = False + + # Subscription task + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + + # Retry settings + self._retry_delay = 5 # Initial retry delay + self._max_retry_delay = 300 # Max 5 minutes between retries + + def _resolve_env(self, value: str) -> str: + """Resolve ${ENV_VAR} references in value.""" + if value and value.startswith("${") and value.endswith("}"): + env_var = value[2:-1] + return os.environ.get(env_var, "") + return value + + @property + def nodes(self) -> dict[int, MQTTNodeInfo]: + """Return cached nodes.""" + return self._nodes + + @property + def packets(self) -> list[dict]: + """Return packets as dicts for compatibility.""" + return [ + { + "packet_id": p.packet_id, + "from_node": p.from_node, + "to_node": p.to_node, + "portnum": p.portnum, + "portnum_name": p.portnum_name, + "channel": p.channel, + "timestamp": p.timestamp, + "snr": p.snr, + "rssi": p.rssi, + "hop_limit": p.hop_limit, + "hop_start": p.hop_start, + "payload_size": p.payload_size, + "gateway_id": p.gateway_id, + } + for p in self._packets + ] + + @property + def is_loaded(self) -> bool: + """Return True if we have received any data.""" + return self._is_loaded + + @property + def data_changed(self) -> bool: + """Return True if data changed since last check, then reset.""" + changed = self._data_changed + self._data_changed = False + return changed + + @property + def health_status(self) -> dict: + """Return health status for dashboard.""" + return { + "name": self._name, + "type": "mqtt", + "host": self._host, + "port": self._port, + "topic_root": self._topic_root, + "is_connected": self._is_connected, + "is_loaded": self._is_loaded, + "last_message": self._last_message, + "last_error": self._last_error, + "message_count": self._message_count, + "node_count": len(self._nodes), + "packet_count": len(self._packets), + } + + async def start(self) -> None: + """Start the subscription loop.""" + if self._task is not None: + logger.warning(f"MQTT source '{self._name}' already started") + return + + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._subscription_loop()) + logger.info(f"Started MQTT source '{self._name}' -> {self._host}:{self._port}") + + async def stop(self) -> None: + """Stop the subscription loop.""" + if self._stop_event: + self._stop_event.set() + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._is_connected = False + logger.info(f"Stopped MQTT source '{self._name}'") + + async def _subscription_loop(self) -> None: + """Main subscription loop with reconnection logic.""" + try: + import aiomqtt + except ImportError: + logger.error("aiomqtt not installed. Run: pip install aiomqtt") + self._last_error = "aiomqtt not installed" + return + + retry_delay = self._retry_delay + + while not self._stop_event.is_set(): + try: + # Build connection kwargs + kwargs = { + "hostname": self._host, + "port": self._port, + } + if self._username: + kwargs["username"] = self._username + if self._password: + kwargs["password"] = self._password + + # TLS setup + if self._use_tls: + import ssl + tls_context = ssl.create_default_context() + kwargs["tls_context"] = tls_context + + async with aiomqtt.Client(**kwargs) as client: + self._is_connected = True + self._last_error = "" + retry_delay = self._retry_delay # Reset on successful connect + logger.info(f"MQTT '{self._name}' connected to {self._host}:{self._port}") + + # Subscribe to all topics under root + # Meshtastic uses: msh/{region}/{channel}/json/{node_id} + # and: msh/{region}/{channel}/!{node_id} + topic = f"{self._topic_root}/#" + await client.subscribe(topic) + logger.info(f"MQTT '{self._name}' subscribed to {topic}") + + async for message in client.messages: + if self._stop_event.is_set(): + break + await self._process_message(message) + + except asyncio.CancelledError: + break + except Exception as e: + self._is_connected = False + self._last_error = str(e) + logger.warning(f"MQTT '{self._name}' error: {e}. Retrying in {retry_delay}s") + + # Exponential backoff + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, self._max_retry_delay) + + async def _process_message(self, message) -> None: + """Process an incoming MQTT message.""" + try: + topic = str(message.topic) + payload = message.payload + + # Skip JSON topics (we want binary ServiceEnvelope) + if "/json/" in topic: + return + + # Skip map reports (stat/ or map/ topics) + if "/stat/" in topic or "/map/" in topic: + return + + # Parse ServiceEnvelope + from meshtastic.protobuf import mqtt_pb2 + + envelope = mqtt_pb2.ServiceEnvelope() + envelope.ParseFromString(payload) + + if not envelope.packet: + return + + packet = envelope.packet + gateway_id = envelope.gateway_id or "" + channel_id = envelope.channel_id or "" + + # Update stats + self._last_message = time.time() + self._message_count += 1 + self._is_loaded = True + self._data_changed = True + + # Extract packet info + pkt_info = MQTTPacketInfo( + packet_id=packet.id, + from_node=packet.from_, + to_node=packet.to, + portnum=packet.decoded.portnum if packet.HasField("decoded") else 0, + portnum_name=PORTNUM_NAMES.get( + packet.decoded.portnum if packet.HasField("decoded") else 0, + "UNKNOWN" + ), + channel=packet.channel, + timestamp=time.time(), + snr=packet.rx_snr if packet.rx_snr else None, + rssi=packet.rx_rssi if packet.rx_rssi else None, + hop_limit=packet.hop_limit if packet.hop_limit else None, + hop_start=packet.hop_start if packet.hop_start else None, + payload_size=len(packet.decoded.payload) if packet.HasField("decoded") else 0, + gateway_id=gateway_id, + ) + + # Add to packet ring buffer + self._packets.append(pkt_info) + if len(self._packets) > self._max_packets: + self._packets = self._packets[-self._max_packets:] + + # Process decoded payload by portnum + if packet.HasField("decoded"): + await self._process_decoded(packet, gateway_id) + + except Exception as e: + logger.debug(f"MQTT message parse error: {e}") + + async def _process_decoded(self, packet, gateway_id: str) -> None: + """Process decoded packet payload.""" + decoded = packet.decoded + portnum = decoded.portnum + from_node = packet.from_ + + # Ensure node exists in cache + if from_node not in self._nodes: + self._nodes[from_node] = MQTTNodeInfo( + node_num=from_node, + node_id_hex=f"!{from_node:08x}", + ) + + node = self._nodes[from_node] + node.last_heard = time.time() + node.snr = packet.rx_snr if packet.rx_snr else node.snr + node.rssi = packet.rx_rssi if packet.rx_rssi else node.rssi + + # NODEINFO_APP (4) + if portnum == 4: + from meshtastic.protobuf import mesh_pb2 + user = mesh_pb2.User() + try: + user.ParseFromString(decoded.payload) + node.short_name = user.short_name or node.short_name + node.long_name = user.long_name or node.long_name + node.hw_model = mesh_pb2.HardwareModel.Name(user.hw_model) if user.hw_model else "" + node.role = user.role + except Exception: + pass + + # POSITION_APP (3) + elif portnum == 3: + from meshtastic.protobuf import mesh_pb2 + pos = mesh_pb2.Position() + try: + pos.ParseFromString(decoded.payload) + if pos.latitude_i: + node.latitude = pos.latitude_i * 1e-7 + if pos.longitude_i: + node.longitude = pos.longitude_i * 1e-7 + if pos.altitude: + node.altitude = pos.altitude + except Exception: + pass + + # TELEMETRY_APP (67) + elif portnum == 67: + from meshtastic.protobuf import telemetry_pb2 + telem = telemetry_pb2.Telemetry() + try: + telem.ParseFromString(decoded.payload) + if telem.HasField("device_metrics"): + dm = telem.device_metrics + if dm.battery_level and dm.battery_level <= 100: + node.battery_percent = dm.battery_level + if dm.voltage: + node.voltage = dm.voltage + if dm.channel_utilization: + node.channel_utilization = dm.channel_utilization + if dm.air_util_tx: + node.air_util_tx = dm.air_util_tx + except Exception: + pass + + # Compatibility methods for MeshDataStore integration + + def tick(self) -> Optional[str]: + """Tick method for compatibility. MQTT is push-based, not polled. + + Returns None since we do not poll endpoints. + """ + return None + + def maybe_refresh(self) -> bool: + """Check if data changed (for legacy compatibility).""" + return self.data_changed