feat(env): NWS weather alerts, NOAA space weather, tropospheric ducting

- Environmental feed system with tick-based adapters
- NWS Active Alerts: polls api.weather.gov, zone-based filtering
- NOAA SWPC: Kp, SFI, R/S/G scales, band assessment, alert detection
- Tropospheric ducting: Open-Meteo GFS refractivity profile, duct classification
- !alerts command for active weather warnings
- !solar / !hf commands for RF propagation (HF + UHF ducting)
- Alert engine integration: severe weather, R3+ blackout, ducting events
- LLM context injection for weather/propagation queries
- Dashboard RF Propagation card with HF + UHF ducting display
- EnvironmentalConfig with per-feed toggles in config.yaml
This commit is contained in:
K7ZVX 2026-05-12 17:21:43 +00:00
commit 549ae4bdfb
20 changed files with 4142 additions and 2652 deletions

View file

@ -124,6 +124,48 @@ mesh_intelligence:
infra_overrides: []
region_labels: {}
# === ENVIRONMENTAL FEEDS ===
# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo.
# Provides weather alerts, HF propagation assessment, and tropospheric ducting.
#
environmental:
enabled: false
nws_zones:
- "IDZ016" # Western Magic Valley
- "IDZ030" # Southern Twin Falls County
# NWS Weather Alerts (api.weather.gov)
nws:
enabled: true
tick_seconds: 60
areas: ["ID"]
severity_min: "moderate"
user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS
# NOAA Space Weather (services.swpc.noaa.gov)
swpc:
enabled: true
# Tropospheric ducting assessment (Open-Meteo GFS, no auth)
ducting:
enabled: true
tick_seconds: 10800 # 3 hours
latitude: 42.56 # center of mesh coverage area
longitude: -114.47
# NIFC Fire Perimeters (Phase 2)
fires:
enabled: false
tick_seconds: 600
state: "US-ID"
# Avalanche Advisories (Phase 2)
avalanche:
enabled: false
tick_seconds: 1800
center_ids: ["SNFAC"]
season_months: [12, 1, 2, 3, 4]
# === WEB DASHBOARD ===
dashboard:
enabled: true

View file

@ -82,14 +82,72 @@ export interface Alert {
export interface EnvStatus {
enabled: boolean
feeds: unknown[]
feeds: EnvFeedHealth[]
}
export interface EnvFeedHealth {
source: string
is_loaded: boolean
last_error: string | null
consecutive_errors: number
event_count: number
last_fetch: number
}
export interface EnvEvent {
type: string
source: string
event_id: string
event_type: string
severity: string
headline: string
description?: string
expires?: number
fetched_at: number
[key: string]: unknown
}
export interface SWPCStatus {
enabled: boolean
kp_current?: number
kp_timestamp?: string
sfi?: number
r_scale?: number
s_scale?: number
g_scale?: number
band_assessment?: string
band_detail?: string
active_warnings?: string[]
}
export interface DuctingStatus {
enabled: boolean
condition?: string
min_gradient?: number
duct_thickness_m?: number | null
duct_base_m?: number | null
assessment?: string
last_update?: string
}
export interface RFPropagation {
hf: {
kp_current?: number
sfi?: number
r_scale?: number
s_scale?: number
g_scale?: number
band_assessment?: string
band_detail?: string
active_warnings?: string[]
}
uhf_ducting: {
condition?: string
min_gradient?: number
duct_thickness_m?: number | null
assessment?: string
}
}
// API fetch helpers
async function fetchJson<T>(url: string): Promise<T> {
@ -152,6 +210,18 @@ export async function fetchEnvActive(): Promise<EnvEvent[]> {
return fetchJson<EnvEvent[]>('/api/env/active')
}
export async function fetchRFPropagation(): Promise<RFPropagation> {
return fetchJson<RFPropagation>('/api/env/propagation')
}
export async function fetchSWPC(): Promise<SWPCStatus> {
return fetchJson<SWPCStatus>('/api/env/swpc')
}
export async function fetchDucting(): Promise<DuctingStatus> {
return fetchJson<DuctingStatus>('/api/env/ducting')
}
export async function fetchRegions(): Promise<unknown[]> {
return fetchJson<unknown[]>('/api/regions')
}

View file

@ -4,10 +4,12 @@ import {
fetchSources,
fetchAlerts,
fetchEnvStatus,
fetchRFPropagation,
type MeshHealth,
type SourceHealth,
type Alert,
type EnvStatus,
type RFPropagation,
} from '@/lib/api'
import { useWebSocket } from '@/hooks/useWebSocket'
import {
@ -19,6 +21,7 @@ import {
Cpu,
Activity,
MapPin,
Zap,
} from 'lucide-react'
function HealthGauge({ health }: { health: MeshHealth }) {
@ -174,7 +177,7 @@ function SourceCard({ source }: { source: SourceHealth }) {
<div className="flex-1 min-w-0">
<div className="text-sm text-slate-200 truncate">{source.name}</div>
<div className="text-xs text-slate-500">
{source.node_count} nodes {source.type}
{source.node_count} nodes * {source.type}
</div>
</div>
</div>
@ -206,11 +209,121 @@ function StatCard({
)
}
function RFPropagationCard({ propagation }: { propagation: RFPropagation | null }) {
if (!propagation) {
return (
<div className="bg-bg-card border border-border rounded-lg p-6">
<h2 className="text-sm font-medium text-slate-400 mb-4">
RF Propagation
</h2>
<div className="text-slate-500">
<p>Loading propagation data...</p>
</div>
</div>
)
}
const hf = propagation.hf
const ducting = propagation.uhf_ducting
const getAssessmentColor = (assessment?: string) => {
if (!assessment) return 'text-slate-400'
switch (assessment.toLowerCase()) {
case 'excellent':
return 'text-green-400'
case 'good':
return 'text-green-500'
case 'fair':
return 'text-amber-500'
case 'poor':
return 'text-red-500'
default:
return 'text-slate-400'
}
}
const getDuctingColor = (condition?: string) => {
if (!condition) return 'text-slate-400'
switch (condition) {
case 'normal':
return 'text-green-500'
case 'super_refraction':
return 'text-amber-500'
case 'surface_duct':
case 'elevated_duct':
return 'text-blue-400'
default:
return 'text-slate-400'
}
}
const hasHF = hf && (hf.band_assessment || hf.sfi || hf.kp_current !== undefined)
const hasDucting = ducting && ducting.condition
return (
<div className="bg-bg-card border border-border rounded-lg p-6">
<h2 className="text-sm font-medium text-slate-400 mb-4 flex items-center gap-2">
<Zap size={14} />
RF Propagation
</h2>
{/* HF Section */}
<div className="mb-4">
<div className="text-xs text-slate-500 mb-1">HF Bands</div>
{hasHF ? (
<div className="space-y-1">
<div className={`text-sm font-medium ${getAssessmentColor(hf.band_assessment)}`}>
{hf.band_assessment || 'Unknown'}
</div>
<div className="text-xs text-slate-400">
SFI {hf.sfi?.toFixed(0) || '?'} / Kp {hf.kp_current?.toFixed(1) || '?'}
</div>
{hf.r_scale !== undefined && hf.r_scale > 0 && (
<div className="text-xs text-amber-500">
R{hf.r_scale} Radio Blackout
</div>
)}
</div>
) : (
<div className="text-sm text-slate-500">No HF data</div>
)}
</div>
{/* UHF Ducting Section */}
<div>
<div className="text-xs text-slate-500 mb-1">UHF 906 MHz</div>
{hasDucting ? (
<div className="space-y-1">
<div className={`text-sm font-medium ${getDuctingColor(ducting.condition)}`}>
{ducting.condition === 'normal'
? 'Normal'
: ducting.condition?.replace('_', ' ').replace(/\b\w/g, l => l.toUpperCase())}
</div>
{ducting.condition !== 'normal' && ducting.min_gradient !== undefined && (
<div className="text-xs text-slate-400">
dM/dz: {ducting.min_gradient} M-units/km
</div>
)}
{ducting.condition !== 'normal' && (
<div className="text-xs text-blue-400">
Extended range likely
</div>
)}
</div>
) : (
<div className="text-sm text-slate-500">No ducting data</div>
)}
</div>
</div>
)
}
export default function Dashboard() {
const [health, setHealth] = useState<MeshHealth | null>(null)
const [sources, setSources] = useState<SourceHealth[]>([])
const [alerts, setAlerts] = useState<Alert[]>([])
const [envStatus, setEnvStatus] = useState<EnvStatus | null>(null)
const [rfProp, setRFProp] = useState<RFPropagation | null>(null)
const [loading, setLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
@ -222,12 +335,14 @@ export default function Dashboard() {
fetchSources(),
fetchAlerts(),
fetchEnvStatus(),
fetchRFPropagation().catch(() => null),
])
.then(([h, src, a, e]) => {
.then(([h, src, a, e, rf]) => {
setHealth(h)
setSources(src)
setAlerts(a)
setEnvStatus(e)
setRFProp(rf)
setLoading(false)
})
.catch((err) => {
@ -360,22 +475,14 @@ export default function Dashboard() {
<div className="text-slate-500">
<p>Environmental feeds not enabled.</p>
<p className="text-xs mt-2">
Enable in Config Mesh Intelligence
Enable in config.yaml
</p>
</div>
)}
</div>
{/* HF Propagation placeholder */}
<div className="bg-bg-card border border-border rounded-lg p-6">
<h2 className="text-sm font-medium text-slate-400 mb-4">
HF Propagation
</h2>
<div className="text-slate-500">
<p>Space weather data not enabled.</p>
<p className="text-xs mt-2">Coming in Phase 1</p>
</div>
</div>
{/* RF Propagation */}
<RFPropagationCard propagation={rfProp} />
</div>
)
}

View file

@ -581,3 +581,84 @@ class AlertEngine:
scope_type=alert.get("scope_type"),
scope_value=alert.get("scope_value"),
)
def check_environmental(self, env_store) -> list[dict]:
"""Check environmental feeds for alertable conditions.
Args:
env_store: EnvironmentalStore instance
Returns:
List of alert dicts
"""
alerts = []
now = time.time()
# NWS severe weather affecting mesh zones
mesh_zones = set(getattr(env_store, "_mesh_zones", []))
for evt in env_store.get_active(source="nws"):
if evt.get("severity") not in ("severe", "extreme", "warning"):
continue
event_zones = set(evt.get("areas", []))
if mesh_zones and not (event_zones & mesh_zones):
continue
key = f"env_nws_{evt['event_id']}"
state = self._get_state(key)
if not state.should_fire(now):
continue
state.fire(now)
alerts.append({
"type": "weather_warning",
"message": f"Warning: {evt['event_type']}: {evt.get('headline', '')[:150]}",
"severity": evt["severity"],
"node_num": None,
"node_name": evt["event_type"],
"node_short": "NWS",
"region": "",
"scope_type": "mesh",
"scope_value": None,
"is_critical": evt["severity"] in ("extreme", "emergency"),
})
# SWPC R-scale >= 3 (HF blackout affecting mesh backhaul)
swpc = env_store.get_swpc_status()
if swpc and swpc.get("r_scale", 0) >= 3:
r_scale = swpc["r_scale"]
key = f"env_swpc_r{r_scale}"
state = self._get_state(key)
if state.should_fire(now):
state.fire(now)
alerts.append({
"type": "hf_blackout",
"message": f"Warning: R{r_scale} HF Radio Blackout -- mesh backhaul links may degrade",
"severity": "warning",
"node_num": None,
"node_name": f"R{r_scale} Blackout",
"node_short": "SWPC",
"region": "",
"scope_type": "mesh",
"scope_value": None,
"is_critical": r_scale >= 4,
})
# UHF ducting (informational -- not critical but operators want to know)
ducting = env_store.get_ducting_status()
if ducting and ducting.get("condition") in ("surface_duct", "elevated_duct"):
key = "env_ducting_active"
state = self._get_state(key)
if state.should_fire(now):
state.fire(now)
alerts.append({
"type": "uhf_ducting",
"message": "UHF ducting detected -- 906 MHz range may be extended, expect distant nodes",
"severity": "info",
"node_num": None,
"node_name": "Ducting",
"node_short": "UHF",
"region": "",
"scope_type": "mesh",
"scope_value": None,
"is_critical": False,
})
return alerts

View file

@ -0,0 +1,49 @@
"""Alerts command handler."""
import time
from datetime import datetime
from .base import CommandContext, CommandHandler
class AlertsCommand(CommandHandler):
"""Active weather alerts for mesh area."""
name = "alerts"
description = "Active weather alerts for mesh area"
usage = "!alerts"
def __init__(self, env_store):
self._env_store = env_store
async def execute(self, args: str, context: CommandContext) -> str:
"""Execute the alerts command."""
if not self._env_store:
return "Environmental feeds not enabled."
zones = self._env_store._mesh_zones
alerts = self._env_store.get_for_zones(zones)
if not alerts:
alerts = self._env_store.get_active(source="nws")
if not alerts:
return "No active weather alerts for the mesh area."
lines = [f"Active Alerts ({len(alerts)}):"]
for a in alerts[:5]:
# Format expiry time
expires = a.get("expires", 0)
if expires:
try:
dt = datetime.fromtimestamp(expires)
expires_str = dt.strftime("%b %d %H:%MZ")
except Exception:
expires_str = "Unknown"
else:
expires_str = "Unknown"
lines.append(f"* {a['event_type']} -- {a.get('area_desc', '')[:60]}")
lines.append(f" Until {expires_str}")
return "\n".join(lines)

View file

@ -161,6 +161,7 @@ def create_dispatcher(
data_store=None,
health_engine=None,
subscription_manager=None,
env_store=None,
) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands.
@ -172,6 +173,7 @@ def create_dispatcher(
data_store: MeshDataStore for neighbor data
health_engine: MeshHealthEngine for infrastructure detection
subscription_manager: SubscriptionManager for subscription commands
env_store: EnvironmentalStore for weather/propagation commands
Returns:
Configured CommandDispatcher
@ -243,6 +245,27 @@ def create_dispatcher(
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register environmental commands
if env_store:
from .alerts_cmd import AlertsCommand
from .solar_cmd import SolarCommand
alerts_cmd = AlertsCommand(env_store)
dispatcher.register(alerts_cmd)
solar_cmd = SolarCommand(env_store)
dispatcher.register(solar_cmd)
# Register !hf as an alias for !solar
hf_cmd = SolarCommand(env_store)
hf_cmd.name = "hf"
dispatcher.register(hf_cmd)
# Register !wx-alerts as an alias for !alerts
wx_cmd = AlertsCommand(env_store)
wx_cmd.name = "wx-alerts"
dispatcher.register(wx_cmd)
# Register custom commands
if custom_commands:
for name, response in custom_commands.items():

View file

@ -0,0 +1,63 @@
"""Solar/RF propagation command handler."""
from .base import CommandContext, CommandHandler
class SolarCommand(CommandHandler):
"""Space weather & RF propagation."""
name = "solar"
description = "Space weather & RF propagation"
usage = "!solar"
def __init__(self, env_store):
self._env_store = env_store
async def execute(self, args: str, context: CommandContext) -> str:
"""Execute the solar command."""
if not self._env_store:
return "Environmental feeds not enabled."
lines = []
# HF section
s = self._env_store.get_swpc_status()
if s:
assessment = s.get("band_assessment", "Unknown")
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
r = s.get("r_scale", 0)
s_sc = s.get("s_scale", 0)
g = s.get("g_scale", 0)
lines.append(f"HF: {assessment} -- SFI {sfi}, Kp {kp}")
lines.append(f" R{r}/S{s_sc}/G{g} scales")
if assessment in ("Excellent", "Good"):
lines.append(" 10m-20m open, solid DX")
elif assessment == "Fair":
lines.append(" 20m-40m usable, upper bands marginal")
else:
lines.append(" Degraded -- lower bands only")
warnings = s.get("active_warnings", [])
for w in warnings[:2]:
lines.append(f" Warning: {w[:100]}")
else:
lines.append("HF: Data not available")
# UHF ducting section
d = self._env_store.get_ducting_status()
if d:
cond = d.get("condition", "unknown")
if cond == "normal":
lines.append("UHF: Normal propagation (906 MHz)")
else:
gradient = d.get("min_gradient", "?")
lines.append(f"UHF: {cond.replace('_', ' ').title()} (906 MHz)")
lines.append(f" dM/dz: {gradient} M-units/km")
lines.append(" Extended range -- expect distant nodes")
else:
lines.append("UHF: Ducting data not available")
return "\n".join(lines)

View file

@ -257,6 +257,65 @@ class MeshIntelligenceConfig:
alert_rules: AlertRulesConfig = field(default_factory=AlertRulesConfig)
# Environmental feed configs
@dataclass
class NWSConfig:
"""NWS weather alerts settings."""
enabled: bool = True
tick_seconds: int = 60
areas: list = field(default_factory=lambda: ["ID"])
severity_min: str = "moderate"
user_agent: str = ""
@dataclass
class SWPCConfig:
"""NOAA Space Weather settings."""
enabled: bool = True
@dataclass
class DuctingConfig:
"""Tropospheric ducting settings."""
enabled: bool = True
tick_seconds: int = 10800 # 3 hours
latitude: float = 42.56 # Twin Falls area default
longitude: float = -114.47
@dataclass
class NICFFiresConfig:
"""NIFC fire perimeters settings (Phase 2)."""
enabled: bool = False
tick_seconds: int = 600
state: str = "US-ID"
@dataclass
class AvalancheConfig:
"""Avalanche advisory settings (Phase 2)."""
enabled: bool = False
tick_seconds: int = 1800
center_ids: list = field(default_factory=lambda: ["SNFAC"])
season_months: list = field(default_factory=lambda: [12, 1, 2, 3, 4])
@dataclass
class EnvironmentalConfig:
"""Environmental feeds settings."""
enabled: bool = False
nws_zones: list = field(default_factory=lambda: ["IDZ016", "IDZ030"])
nws: NWSConfig = field(default_factory=NWSConfig)
swpc: SWPCConfig = field(default_factory=SWPCConfig)
ducting: DuctingConfig = field(default_factory=DuctingConfig)
fires: NICFFiresConfig = field(default_factory=NICFFiresConfig)
avalanche: AvalancheConfig = field(default_factory=AvalancheConfig)
@dataclass
@ -284,6 +343,7 @@ class Config:
knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
mesh_sources: list[MeshSourceConfig] = field(default_factory=list)
mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig)
environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig)
dashboard: DashboardConfig = field(default_factory=DashboardConfig)
_config_path: Optional[Path] = field(default=None, repr=False)
@ -339,6 +399,17 @@ def _dict_to_dataclass(cls, data: dict):
# Handle AlertRulesConfig
elif key == "alert_rules" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(AlertRulesConfig, value)
# Handle nested environmental configs
elif key == "nws" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(NWSConfig, value)
elif key == "swpc" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(SWPCConfig, value)
elif key == "ducting" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(DuctingConfig, value)
elif key == "fires" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(NICFFiresConfig, value)
elif key == "avalanche" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(AvalancheConfig, value)
else:
kwargs[key] = value

View file

@ -1,4 +1,4 @@
"""Environmental data API routes (Phase 1 placeholder)."""
"""Environmental data API routes."""
from fastapi import APIRouter, Request
@ -8,37 +8,70 @@ router = APIRouter(tags=["environment"])
@router.get("/env/status")
async def get_env_status(request: Request):
"""Get environmental feeds status."""
env_store = request.app.state.env_store
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"enabled": False, "feeds": []}
# Will be populated in Phase 1 when env_store exists
return {
"enabled": True,
"feeds": [],
"feeds": env_store.get_source_health(),
}
@router.get("/env/active")
async def get_active_env(request: Request):
"""Get active environmental conditions."""
env_store = request.app.state.env_store
"""Get active environmental events."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return []
# Will be populated in Phase 1
return []
return env_store.get_active()
@router.get("/env/swpc")
async def get_swpc_data(request: Request):
"""Get SWPC space weather data."""
env_store = request.app.state.env_store
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"enabled": False}
# Will be populated in Phase 1
status = env_store.get_swpc_status()
if not status:
return {"enabled": False}
return {
"enabled": True,
**status,
}
@router.get("/env/propagation")
async def get_rf_propagation(request: Request):
"""Get combined HF + UHF propagation data for dashboard."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"hf": {}, "uhf_ducting": {}}
return env_store.get_rf_propagation()
@router.get("/env/ducting")
async def get_ducting_data(request: Request):
"""Get tropospheric ducting assessment."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"enabled": False}
status = env_store.get_ducting_status()
if not status:
return {"enabled": False}
return {
"enabled": True,
**status,
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -8,8 +8,8 @@
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-DnO02g6m.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DdqEB3wX.css">
<script type="module" crossorigin src="/assets/index-CELmCk_K.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DKYlTqQ1.css">
</head>
<body>
<div id="root"></div>

1
meshai/env/__init__.py vendored Normal file
View file

@ -0,0 +1 @@
"""Environmental feeds package."""

273
meshai/env/ducting.py vendored Normal file
View file

@ -0,0 +1,273 @@
"""Tropospheric ducting assessment adapter using Open-Meteo GFS."""
import json
import logging
import math
import time
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import DuctingConfig
logger = logging.getLogger(__name__)
# Pressure levels and approximate heights (meters)
PRESSURE_LEVELS = {
1000: 110, # ~110m
925: 760, # ~760m
850: 1500, # ~1500m
700: 3000, # ~3000m
}
class DuctingAdapter:
"""Tropospheric ducting assessment from Open-Meteo GFS pressure levels."""
def __init__(self, config: "DuctingConfig"):
self._lat = config.latitude or 42.56
self._lon = config.longitude or -114.47
self._tick_interval = config.tick_seconds or 10800 # 3 hours
self._last_tick = 0.0
self._status = {}
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _fetch(self) -> bool:
"""Fetch GFS data from Open-Meteo API.
Returns:
True on success
"""
# Build API URL
hourly_vars = [
"temperature_1000hPa", "temperature_925hPa",
"temperature_850hPa", "temperature_700hPa",
"relative_humidity_1000hPa", "relative_humidity_925hPa",
"relative_humidity_850hPa", "relative_humidity_700hPa",
"surface_pressure",
]
url = (
f"https://api.open-meteo.com/v1/gfs"
f"?latitude={self._lat}&longitude={self._lon}"
f"&hourly={','.join(hourly_vars)}"
f"&forecast_days=1&timezone=auto"
)
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"Ducting API HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"Ducting API connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"Ducting API error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
try:
self._parse_response(data)
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
logger.info(f"Ducting assessment updated: {self._status.get('condition', 'unknown')}")
return True
except Exception as e:
logger.warning(f"Ducting parse error: {e}")
self._last_error = f"parse error: {e}"
return False
def _parse_response(self, data):
"""Parse Open-Meteo response and compute ducting assessment."""
hourly = data.get("hourly", {})
times = hourly.get("time", [])
if not times:
raise ValueError("No time data in response")
# Find index closest to current time
now = datetime.now()
idx = 0
for i, t in enumerate(times):
try:
dt = datetime.fromisoformat(t)
if dt <= now:
idx = i
except Exception:
pass
# Extract values for current hour
def get_val(key):
vals = hourly.get(key, [])
return vals[idx] if idx < len(vals) else None
# Build profile for each pressure level
profile = []
gradients = []
levels = sorted(PRESSURE_LEVELS.keys(), reverse=True) # 1000, 925, 850, 700
for i, pressure in enumerate(levels):
temp_key = f"temperature_{pressure}hPa"
rh_key = f"relative_humidity_{pressure}hPa"
t_celsius = get_val(temp_key)
rh = get_val(rh_key)
if t_celsius is None or rh is None:
continue
height_m = PRESSURE_LEVELS[pressure]
# Calculate radio refractivity N
t_kelvin = t_celsius + 273.15
# Saturation vapor pressure (Magnus formula)
e_sat = 6.112 * math.exp(17.67 * t_celsius / (t_celsius + 243.5))
# Actual vapor pressure
e = (rh / 100.0) * e_sat
# Radio refractivity
n = 77.6 * (pressure / t_kelvin) + 3.73e5 * (e / t_kelvin**2)
# Modified refractivity (accounts for Earth curvature)
h_km = height_m / 1000.0
m = n + 157.0 * h_km
profile.append({
"level_hPa": pressure,
"height_m": height_m,
"N": round(n, 1),
"M": round(m, 1),
"T_C": round(t_celsius, 1),
"RH": round(rh, 1),
})
# Compute gradients between adjacent levels
for i in range(len(profile) - 1):
lower = profile[i]
upper = profile[i + 1]
dM = upper["M"] - lower["M"]
dz = (upper["height_m"] - lower["height_m"]) / 1000.0 # km
if dz > 0:
gradient = dM / dz
gradients.append({
"from_level": lower["level_hPa"],
"to_level": upper["level_hPa"],
"from_height_m": lower["height_m"],
"to_height_m": upper["height_m"],
"gradient": round(gradient, 1),
})
# Classify conditions based on minimum gradient
# Standard atmosphere: ~118 M-units/km
# Normal: > 79
# Super-refraction: 0 to 79
# Ducting: < 0 (negative = trapping layer)
min_gradient = min((g["gradient"] for g in gradients), default=118)
min_gradient_layer = None
for g in gradients:
if g["gradient"] == min_gradient:
min_gradient_layer = g
break
if min_gradient < 0:
# Ducting detected
if min_gradient_layer and min_gradient_layer["from_level"] == 1000:
condition = "surface_duct"
else:
condition = "elevated_duct"
duct_base = min_gradient_layer["from_height_m"] if min_gradient_layer else 0
duct_thickness = (
min_gradient_layer["to_height_m"] - min_gradient_layer["from_height_m"]
if min_gradient_layer else 0
)
assessment = "Ducting -- extended UHF range likely"
elif min_gradient < 79:
condition = "super_refraction"
duct_base = None
duct_thickness = None
assessment = "Enhanced range possible"
else:
condition = "normal"
duct_base = None
duct_thickness = None
assessment = "Normal propagation"
# Update status
self._status = {
"condition": condition,
"min_gradient": round(min_gradient, 1),
"duct_thickness_m": duct_thickness,
"duct_base_m": duct_base,
"profile": profile,
"gradients": gradients,
"assessment": assessment,
"last_update": times[idx] if idx < len(times) else None,
"fetched_at": time.time(),
"location": {
"lat": self._lat,
"lon": self._lon,
},
}
def get_status(self) -> dict:
"""Get current ducting status."""
return self._status
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "ducting",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": 0,
"last_fetch": self._last_tick,
}

193
meshai/env/nws.py vendored Normal file
View file

@ -0,0 +1,193 @@
"""NWS Active Alerts adapter."""
import json
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import NWSConfig
logger = logging.getLogger(__name__)
class NWSAlertsAdapter:
"""NWS Active Alerts -- polls api.weather.gov"""
def __init__(self, config: "NWSConfig"):
self._areas = config.areas or ["ID"]
self._user_agent = config.user_agent or "(meshai, ops@example.com)"
self._severity_min = config.severity_min or "moderate"
self._tick_interval = config.tick_seconds or 60
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._backoff_until = 0.0
self._is_loaded = False
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Rate limit backoff
if now < self._backoff_until:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _fetch(self) -> bool:
"""Fetch alerts from NWS API.
Returns:
True if data changed
"""
areas = ",".join(self._areas)
url = f"https://api.weather.gov/alerts/active?area={areas}"
headers = {
"User-Agent": self._user_agent,
"Accept": "application/geo+json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 429:
self._backoff_until = time.time() + 5
logger.warning("NWS rate limited, backing off 5s")
else:
logger.warning(f"NWS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"NWS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"NWS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
features = data.get("features", [])
new_events = []
# Severity levels for filtering
severity_levels = ["unknown", "minor", "moderate", "severe", "extreme"]
try:
min_idx = severity_levels.index(self._severity_min.lower())
except ValueError:
min_idx = 2 # default to moderate
for feature in features:
props = feature.get("properties", {})
# Severity filtering
severity = (props.get("severity") or "Unknown").lower()
try:
sev_idx = severity_levels.index(severity)
except ValueError:
sev_idx = 0
if sev_idx < min_idx:
continue
# Parse timestamps
onset = self._parse_iso(props.get("onset"))
expires = self._parse_iso(props.get("expires"))
event = {
"source": "nws",
"event_id": props.get("id", ""),
"event_type": props.get("event", "Unknown"),
"severity": severity,
"headline": props.get("headline", ""),
"description": (props.get("description") or "")[:500],
"onset": onset,
"expires": expires,
"areas": props.get("geocode", {}).get("UGC", []),
"area_desc": props.get("areaDesc", ""),
"fetched_at": time.time(),
}
# Try to get centroid from geometry
geom = feature.get("geometry")
if geom and geom.get("coordinates"):
try:
coords = geom["coordinates"]
if geom.get("type") == "Polygon" and coords:
# Compute centroid of first ring
ring = coords[0]
lat_sum = sum(c[1] for c in ring)
lon_sum = sum(c[0] for c in ring)
event["lat"] = lat_sum / len(ring)
event["lon"] = lon_sum / len(ring)
except Exception:
pass
new_events.append(event)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"NWS alerts updated: {len(new_events)} active")
return changed
def _parse_iso(self, iso_str: str) -> float:
"""Parse ISO timestamp to epoch float."""
if not iso_str:
return 0.0
try:
# Handle various ISO formats
if iso_str.endswith("Z"):
iso_str = iso_str[:-1] + "+00:00"
dt = datetime.fromisoformat(iso_str)
return dt.timestamp()
except Exception:
return 0.0
def get_events(self) -> list:
"""Get current events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "nws",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
}

168
meshai/env/store.py vendored Normal file
View file

@ -0,0 +1,168 @@
"""Environmental data store with tick-based adapter polling."""
import logging
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(self, config: "EnvironmentalConfig"):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
# Create adapter instances based on config
if config.nws.enabled:
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled:
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled:
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
Returns:
True if any data changed
"""
changed = False
for name, adapter in self._adapters.items():
try:
if adapter.tick():
changed = True
self._ingest(name, adapter)
except Exception as e:
logger.warning("Env adapter %s error: %s", name, e)
self._purge_expired()
return changed
def _ingest(self, name: str, adapter):
"""Ingest data from an adapter after it ticks."""
if name == "swpc":
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""
now = time.time()
expired = [
k for k, v in self._events.items()
if v.get("expires") and v["expires"] < now
]
for k in expired:
del self._events[k]
def get_active(self, source: str = None) -> list:
"""Get active events, optionally filtered by source.
Args:
source: Filter to specific source (nws, swpc, etc.)
Returns:
List of event dicts sorted by fetched_at (newest first)
"""
events = list(self._events.values())
if source:
events = [e for e in events if e["source"] == source]
return sorted(events, key=lambda e: e.get("fetched_at", 0), reverse=True)
def get_for_zones(self, zones: list) -> list:
"""Get events affecting specific NWS zones.
Args:
zones: List of UGC zone codes (e.g., ["IDZ016", "IDZ030"])
Returns:
List of events with overlapping zone coverage
"""
zone_set = set(zones)
return [
e for e in self._events.values()
if set(e.get("areas", [])) & zone_set
]
def get_swpc_status(self) -> dict:
"""Get current SWPC space weather status."""
return self._swpc_status
def get_ducting_status(self) -> dict:
"""Get current tropospheric ducting status."""
return self._ducting_status
def get_rf_propagation(self) -> dict:
"""Combined HF + UHF propagation summary for dashboard/LLM."""
return {
"hf": self._swpc_status,
"uhf_ducting": self._ducting_status,
}
def get_summary(self) -> str:
"""Compact text block for LLM context injection."""
lines = []
lines.append(f"### Current Conditions (as of {time.strftime('%H:%M:%S MT')}):")
# NWS alerts
nws = self.get_active(source="nws")
if nws:
lines.append(f"NWS: {len(nws)} active alert(s):")
for a in nws[:3]:
lines.append(f" - {a['event_type']}: {a['headline'][:120]}")
else:
lines.append("NWS: No active alerts for mesh area.")
# HF
s = self._swpc_status
if s:
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
assessment = s.get("band_assessment", "Unknown")
lines.append(f"HF: {assessment} -- SFI {sfi}, Kp {kp}")
warnings = s.get("active_warnings", [])
if warnings:
for w in warnings[:2]:
lines.append(f" Warning: {w}")
else:
lines.append("HF: Space weather data not available.")
# UHF ducting
d = self._ducting_status
if d:
condition = d.get("condition", "unknown")
if condition == "normal":
lines.append("UHF Ducting: Normal propagation, no ducting detected.")
elif condition in ("super_refraction", "ducting", "surface_duct", "elevated_duct"):
gradient = d.get("min_gradient", "?")
thickness = d.get("duct_thickness_m", "?")
lines.append(f"UHF Ducting: {condition.replace('_', ' ').title()} detected")
lines.append(f" dM/dz: {gradient} M-units/km, duct ~{thickness}m thick")
lines.append(" Extended range likely on 906 MHz -- expect distant nodes")
return "\n".join(lines)
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]

256
meshai/env/swpc.py vendored Normal file
View file

@ -0,0 +1,256 @@
"""NOAA Space Weather Prediction Center adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import SWPCConfig
logger = logging.getLogger(__name__)
class SWPCAdapter:
"""NOAA Space Weather -- multi-endpoint with staggered ticks."""
# Endpoint definitions: (url, interval_seconds)
ENDPOINTS = {
"scales": ("https://services.swpc.noaa.gov/products/noaa-scales.json", 300),
"kp": ("https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json", 600),
"alerts": ("https://services.swpc.noaa.gov/products/alerts.json", 120),
"f107": ("https://services.swpc.noaa.gov/json/f107_cm_flux.json", 86400),
}
def __init__(self, config: "SWPCConfig"):
self._last_tick = {} # endpoint -> last_tick timestamp
self._status = {}
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Initialize tick times to 0
for endpoint in self.ENDPOINTS:
self._last_tick[endpoint] = 0.0
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
changed = False
now = time.time()
for endpoint, (url, interval) in self.ENDPOINTS.items():
if now - self._last_tick[endpoint] >= interval:
self._last_tick[endpoint] = now
if self._fetch_endpoint(endpoint, url):
changed = True
if changed:
self._update_assessment()
return changed
def _fetch_endpoint(self, endpoint: str, url: str) -> bool:
"""Fetch a single endpoint.
Returns:
True on success
"""
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"SWPC {endpoint} HTTP error: {e.code}")
self._last_error = f"{endpoint}: HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"SWPC {endpoint} connection error: {e.reason}")
self._last_error = f"{endpoint}: {e.reason}"
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"SWPC {endpoint} error: {e}")
self._last_error = f"{endpoint}: {e}"
self._consecutive_errors += 1
return False
# Parse based on endpoint
try:
if endpoint == "scales":
self._parse_scales(data)
elif endpoint == "kp":
self._parse_kp(data)
elif endpoint == "alerts":
self._parse_alerts(data)
elif endpoint == "f107":
self._parse_f107(data)
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
return True
except Exception as e:
logger.warning(f"SWPC {endpoint} parse error: {e}")
self._last_error = f"{endpoint}: parse error"
return False
def _parse_scales(self, data):
"""Parse noaa-scales.json.
Data format: {""-1": {...}, "0": {...}, "1": {...}, ...}
"0" is current.
"""
current = data.get("0", {})
r_data = current.get("R", {})
s_data = current.get("S", {})
g_data = current.get("G", {})
# Handle empty string or None Scale values
def parse_scale(val):
if val is None or val == "":
return 0
try:
return int(val)
except (ValueError, TypeError):
return 0
self._status["r_scale"] = parse_scale(r_data.get("Scale"))
self._status["s_scale"] = parse_scale(s_data.get("Scale"))
self._status["g_scale"] = parse_scale(g_data.get("Scale"))
def _parse_kp(self, data):
"""Parse noaa-planetary-k-index.json.
Data format: array of arrays
First row is header: ["time_tag", "Kp", "a_running", "station_count"]
Last row is most recent.
"""
if not data or len(data) < 2:
return
# Find Kp column index from header
header = data[0]
try:
kp_idx = header.index("Kp")
except ValueError:
kp_idx = 1
# Get last row
last_row = data[-1]
if len(last_row) > kp_idx:
try:
self._status["kp_current"] = float(last_row[kp_idx])
except (ValueError, TypeError):
pass
# Get timestamp
if len(last_row) > 0:
self._status["kp_timestamp"] = last_row[0]
def _parse_alerts(self, data):
"""Parse alerts.json.
Data format: array of objects with product_id, issue_datetime, message
"""
warnings = []
if isinstance(data, list):
for alert in data[:5]: # Keep most recent 5
message = alert.get("message", "")
# Extract first line as headline
headline = message.split("\n")[0].strip()
if headline:
warnings.append(headline)
self._status["active_warnings"] = warnings
def _parse_f107(self, data):
"""Parse f107_cm_flux.json.
Data format: array of objects with time_tag, flux
"""
if not data:
return
# Get most recent entry (last in list)
if isinstance(data, list) and data:
last = data[-1]
if isinstance(last, dict):
try:
self._status["sfi"] = float(last.get("flux", 0))
except (ValueError, TypeError):
pass
def _update_assessment(self):
"""Compute band assessment from SFI and Kp."""
sfi = self._status.get("sfi", 0)
kp = self._status.get("kp_current", 0)
# Band assessment formula
if sfi > 150 and kp <= 1:
assessment = "Excellent"
detail = "Upper HF bands (10m-20m) open, solid DX conditions"
elif sfi >= 100 and kp <= 3:
assessment = "Good"
detail = "Upper HF bands (10m-20m) open, solid DX conditions"
elif sfi >= 80 and kp <= 4:
assessment = "Fair"
detail = "Mid HF bands (20m-40m) usable, upper bands marginal"
else:
assessment = "Poor"
detail = "HF conditions degraded, stick to lower bands (40m-80m)"
self._status["band_assessment"] = assessment
self._status["band_detail"] = detail
# Generate events for R-scale >= 3
self._events = []
r_scale = self._status.get("r_scale", 0)
if r_scale >= 3:
self._events.append({
"source": "swpc",
"event_id": f"swpc_r{r_scale}_{int(time.time())}",
"event_type": f"R{r_scale} Radio Blackout",
"severity": "warning" if r_scale >= 3 else "advisory",
"headline": f"R{r_scale} HF Radio Blackout -- HF comms degraded",
"expires": time.time() + 3600, # 1hr TTL
"areas": [],
"fetched_at": time.time(),
})
def get_status(self) -> dict:
"""Get current SWPC status."""
return self._status
def get_events(self) -> list:
"""Get current alert events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "swpc",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": max(self._last_tick.values()) if self._last_tick else 0,
}

View file

@ -44,6 +44,7 @@ class MeshAI:
self.mesh_reporter = None
self.subscription_manager = None
self.alert_engine = None
self.env_store = None # Environmental feeds store
self._last_sub_check: float = 0.0
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
@ -126,6 +127,28 @@ class MeshAI:
except Exception:
pass
# Environmental feed refresh
if self.env_store:
try:
env_changed = self.env_store.refresh()
if env_changed and self.alert_engine:
env_alerts = self.alert_engine.check_environmental(self.env_store)
if env_alerts:
await self._dispatch_alerts(env_alerts)
if self.broadcaster:
for ea in env_alerts:
await self.broadcaster.broadcast("alert_fired", ea)
# Broadcast env updates to dashboard
if env_changed and self.broadcaster:
await self.broadcaster.broadcast("env_update", {
"active_count": len(self.env_store.get_active()),
"swpc": self.env_store.get_swpc_status(),
"ducting": self.env_store.get_ducting_status(),
})
except Exception as e:
logger.debug("Env refresh error: %s", e)
# Check scheduled subscriptions (every 60 seconds)
if self.subscription_manager and self.mesh_reporter:
if time.time() - self._last_sub_check >= 60:
@ -310,6 +333,15 @@ class MeshAI:
)
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
# Environmental feeds
env_cfg = self.config.environmental
if env_cfg.enabled:
from .env.store import EnvironmentalStore
self.env_store = EnvironmentalStore(config=env_cfg)
logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)")
else:
self.env_store = None
# Knowledge base (optional - Qdrant with SQLite fallback)
kb_cfg = self.config.knowledge
self.knowledge = None
@ -355,6 +387,7 @@ class MeshAI:
data_store=self.data_store,
health_engine=self.health_engine,
subscription_manager=self.subscription_manager,
env_store=self.env_store,
)
# Message router
@ -366,6 +399,7 @@ class MeshAI:
source_manager=self.data_store,
health_engine=self.health_engine,
mesh_reporter=self.mesh_reporter,
env_store=self.env_store,
)
# Responder

View file

@ -87,6 +87,15 @@ _MESH_PHRASES = [
"how are",
]
# Keywords that indicate environmental/weather/propagation questions
_ENV_KEYWORDS = {
"weather", "alert", "warning", "fire", "smoke", "road", "closure",
"snow", "avalanche", "avy", "solar", "hf", "propagation", "kp",
"aurora", "blackout", "flood", "stream", "river", "ducting",
"tropo", "duct", "uhf", "vhf", "906", "band", "conditions",
"forecast", "sfi", "ionosphere", "geomagnetic", "storm",
}
# City name to region mapping (hardcoded fallback)
# City/alias mapping now built from config - see _build_alias_map()
@ -187,6 +196,7 @@ class MessageRouter:
source_manager=None,
health_engine=None,
mesh_reporter=None,
env_store=None,
):
self.config = config
self.connector = connector
@ -199,6 +209,7 @@ class MessageRouter:
self.source_manager = source_manager
self.health_engine = health_engine
self.mesh_reporter = mesh_reporter
self.env_store = env_store
self.continuations = ContinuationState(max_continuations=3)
# Per-user mesh context tracking for follow-up handling
@ -737,6 +748,16 @@ class MessageRouter:
# Not a mesh question
self._update_user_mesh_context(message.sender_id, is_mesh=False)
# 7. Environmental context injection
if self.env_store:
query_lower = query.lower() if query else ""
env_relevant = any(kw in query_lower for kw in _ENV_KEYWORDS)
# Also inject env context if mesh context is being injected
if env_relevant or should_inject_mesh:
env_summary = self.env_store.get_summary()
if env_summary:
system_prompt += "\n\n" + env_summary
# DEBUG: Log system prompt status
logger.debug(f"System prompt length: {len(system_prompt)} chars")