feat(dashboard): embedded FastAPI backend with REST API + WebSocket

- FastAPI runs in MeshAI asyncio loop (no separate process)
- REST API: /api/status, /api/health, /api/nodes, /api/edges,
  /api/regions, /api/sources, /api/config, /api/alerts
- WebSocket at /ws/live pushes health updates and alerts
- Config CRUD: GET/PUT per section with validation and save
- DashboardConfig with port/host in config.yaml
This commit is contained in:
K7ZVX 2026-05-12 15:47:58 +00:00
commit 3ec09ad158
17 changed files with 1140 additions and 103 deletions

View file

@ -78,7 +78,7 @@ USER meshai
VOLUME ["/data"]
# Expose ttyd web config port
EXPOSE 7682
EXPOSE 7682 8080
# Health check - verify bot process is alive via PID file
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \

View file

@ -123,3 +123,9 @@ mesh_intelligence:
battery_warning_percent: 20
infra_overrides: []
region_labels: {}
# === WEB DASHBOARD ===
dashboard:
enabled: true
port: 8080
host: "0.0.0.0"

View file

@ -35,6 +35,8 @@ services:
ports:
# Web-based config interface (ttyd)
- "7682:7682"
# Dashboard API
- "8080:8080"
volumes:
# Persistent data (database, config)

View file

@ -257,6 +257,16 @@ class MeshIntelligenceConfig:
alert_rules: AlertRulesConfig = field(default_factory=AlertRulesConfig)
@dataclass
class DashboardConfig:
"""Web dashboard settings."""
enabled: bool = True
port: int = 8080
host: str = "0.0.0.0"
@dataclass
class Config:
"""Main configuration container."""
@ -274,6 +284,7 @@ class Config:
knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
mesh_sources: list[MeshSourceConfig] = field(default_factory=list)
mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig)
dashboard: DashboardConfig = field(default_factory=DashboardConfig)
_config_path: Optional[Path] = field(default=None, repr=False)

View file

@ -0,0 +1 @@
"""Dashboard package for MeshAI web interface."""

View file

@ -0,0 +1 @@
"""Dashboard API routes package."""

View file

@ -0,0 +1,85 @@
"""Alert API routes."""
from fastapi import APIRouter, Request
router = APIRouter(tags=["alerts"])
@router.get("/alerts/active")
async def get_active_alerts(request: Request):
"""Get currently active alerts."""
alert_engine = request.app.state.alert_engine
if not alert_engine:
return []
# Get recent alerts from alert engine if it has internal state
alerts = []
# Check for AlertState or similar if available
if hasattr(alert_engine, "get_active_alerts"):
try:
raw_alerts = alert_engine.get_active_alerts()
for alert in raw_alerts:
alerts.append({
"type": alert.get("type", "unknown"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": alert.get("timestamp"),
"scope_type": alert.get("scope_type"),
"scope_value": alert.get("scope_value"),
})
except Exception:
pass
elif hasattr(alert_engine, "_recent_alerts"):
try:
for alert in alert_engine._recent_alerts:
alerts.append({
"type": alert.get("type", "unknown"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": alert.get("timestamp"),
})
except Exception:
pass
return alerts
@router.get("/alerts/history")
async def get_alert_history(
request: Request,
limit: int = 50,
offset: int = 0,
):
"""Get historical alerts with pagination."""
# Historical alert data would come from SQLite
# For now, return empty list
return []
@router.get("/subscriptions")
async def get_subscriptions(request: Request):
"""Get all alert subscriptions."""
subscription_manager = request.app.state.subscription_manager
if not subscription_manager:
return []
try:
subs = subscription_manager.get_all_subs()
return [
{
"id": sub["id"],
"user_id": sub["user_id"],
"sub_type": sub["sub_type"],
"schedule_time": sub.get("schedule_time"),
"schedule_day": sub.get("schedule_day"),
"scope_type": sub.get("scope_type", "mesh"),
"scope_value": sub.get("scope_value"),
"enabled": sub.get("enabled", 1) == 1,
}
for sub in subs
]
except Exception:
return []

View file

@ -0,0 +1,183 @@
"""Configuration API routes."""
import logging
from fastapi import APIRouter, HTTPException, Request
from meshai.config import (
Config,
_dataclass_to_dict,
_dict_to_dataclass,
load_config,
save_config,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["config"])
# Sections that require restart when changed
RESTART_REQUIRED_SECTIONS = {
"connection",
"llm",
"mesh_sources",
"meshmonitor",
"dashboard",
}
# Valid config section names
VALID_SECTIONS = {
"bot",
"connection",
"response",
"history",
"memory",
"context",
"commands",
"llm",
"weather",
"meshmonitor",
"knowledge",
"mesh_sources",
"mesh_intelligence",
"dashboard",
}
@router.get("/config")
async def get_full_config(request: Request):
"""Get full configuration."""
config = request.app.state.config
return _dataclass_to_dict(config)
@router.get("/config/{section}")
async def get_config_section(section: str, request: Request):
"""Get a specific configuration section."""
if section not in VALID_SECTIONS:
raise HTTPException(
status_code=404,
detail=f"Section '{section}' not found. Valid sections: {', '.join(sorted(VALID_SECTIONS))}"
)
config = request.app.state.config
if not hasattr(config, section):
raise HTTPException(status_code=404, detail=f"Section '{section}' not found")
section_data = getattr(config, section)
# Handle list types (mesh_sources)
if isinstance(section_data, list):
return [
_dataclass_to_dict(item) if hasattr(item, "__dataclass_fields__") else item
for item in section_data
]
# Handle dataclass types
if hasattr(section_data, "__dataclass_fields__"):
return _dataclass_to_dict(section_data)
return section_data
@router.put("/config/{section}")
async def update_config_section(section: str, request: Request):
"""Update a configuration section."""
if section not in VALID_SECTIONS:
raise HTTPException(
status_code=404,
detail=f"Section '{section}' not found. Valid sections: {', '.join(sorted(VALID_SECTIONS))}"
)
config_path = request.app.state.config_path
if not config_path:
raise HTTPException(status_code=500, detail="Config path not set")
try:
body = await request.json()
except Exception as e:
raise HTTPException(status_code=422, detail=f"Invalid JSON: {e}")
try:
# Load fresh config from file to avoid conflicts
config = load_config(config_path)
# Get the section's dataclass type
field_info = Config.__dataclass_fields__.get(section)
if not field_info:
raise HTTPException(status_code=404, detail=f"Section '{section}' not found")
field_type = field_info.type
# Handle list types (mesh_sources)
if section == "mesh_sources":
from meshai.config import MeshSourceConfig
new_value = [
_dict_to_dataclass(MeshSourceConfig, item) if isinstance(item, dict) else item
for item in body
]
# Handle dataclass types
elif hasattr(field_type, "__dataclass_fields__"):
new_value = _dict_to_dataclass(field_type, body)
else:
new_value = body
# Set the section on config
setattr(config, section, new_value)
# Save config to file
save_config(config, config_path)
# Determine if restart is required
restart_required = section in RESTART_REQUIRED_SECTIONS
# Update live config if restart not required
if not restart_required:
request.app.state.config = config
logger.info(f"Config section '{section}' updated, restart_required={restart_required}")
return {"saved": True, "restart_required": restart_required}
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
except Exception as e:
logger.error(f"Config update error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/config/test-llm")
async def test_llm_connection(request: Request):
"""Test LLM backend connection."""
config = request.app.state.config
try:
# Create LLM backend based on config
api_key = config.resolve_api_key()
if not api_key:
return {"success": False, "error": "No API key configured"}
backend_name = config.llm.backend.lower()
if backend_name == "openai":
from meshai.backends import OpenAIBackend
backend = OpenAIBackend(config.llm, api_key, 0, 0)
elif backend_name == "anthropic":
from meshai.backends import AnthropicBackend
backend = AnthropicBackend(config.llm, api_key, 0, 0)
elif backend_name == "google":
from meshai.backends import GoogleBackend
backend = GoogleBackend(config.llm, api_key, 0, 0)
else:
return {"success": False, "error": f"Unknown backend: {backend_name}"}
# Send test prompt
response = await backend.generate("Reply with 'OK' if you can read this.", [])
await backend.close()
return {"success": True, "response": response}
except Exception as e:
logger.error(f"LLM test error: {e}")
return {"success": False, "error": str(e)}

View file

@ -0,0 +1,44 @@
"""Environmental data API routes (Phase 1 placeholder)."""
from fastapi import APIRouter, Request
router = APIRouter(tags=["environment"])
@router.get("/env/status")
async def get_env_status(request: Request):
"""Get environmental feeds status."""
env_store = request.app.state.env_store
if not env_store:
return {"enabled": False, "feeds": []}
# Will be populated in Phase 1 when env_store exists
return {
"enabled": True,
"feeds": [],
}
@router.get("/env/active")
async def get_active_env(request: Request):
"""Get active environmental conditions."""
env_store = request.app.state.env_store
if not env_store:
return []
# Will be populated in Phase 1
return []
@router.get("/env/swpc")
async def get_swpc_data(request: Request):
"""Get SWPC space weather data."""
env_store = request.app.state.env_store
if not env_store:
return {"enabled": False}
# Will be populated in Phase 1
return {"enabled": False}

View file

@ -0,0 +1,356 @@
"""Mesh health and node API routes."""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
router = APIRouter(tags=["mesh"])
def _serialize_health_score(score) -> dict:
"""Serialize a HealthScore object."""
return {
"composite": round(score.composite, 1),
"tier": score.tier,
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"solar_index": round(score.solar_index, 1),
}
def _serialize_region(region) -> dict:
"""Serialize a RegionHealth object."""
return {
"name": region.name,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
"node_count": len(region.node_ids),
"locality_count": len(region.localities),
"score": _serialize_health_score(region.score),
"node_ids": region.node_ids,
}
def _format_timestamp(ts: Optional[float]) -> Optional[str]:
"""Format a Unix timestamp as ISO string."""
if not ts or ts <= 0:
return None
try:
return datetime.fromtimestamp(ts).isoformat()
except (ValueError, OSError):
return None
@router.get("/health")
async def get_health(request: Request):
"""Get mesh health data."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return {
"score": 0,
"tier": "Unknown",
"message": "Health engine not ready",
}
health = health_engine.mesh_health
score = health.score
return {
"score": round(score.composite, 1),
"tier": score.tier,
"pillars": {
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
},
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"total_nodes": health.total_nodes,
"total_regions": health.total_regions,
"unlocated_count": len(health.unlocated_nodes),
"last_computed": _format_timestamp(health.last_computed),
"recommendations": [], # TODO: Add recommendations
}
@router.get("/nodes")
async def get_nodes(request: Request):
"""Get all nodes."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
return []
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
return []
nodes = []
for node in raw_nodes:
# Extract node_num from various formats
node_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if node_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
node_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if node_num is None:
continue
# Get health data if available
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
}
# Build node dict
node_dict = {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": node.get("shortName") or node.get("short_name") or "",
"long_name": node.get("longName") or node.get("long_name") or "",
"role": node.get("role") or "",
"latitude": node.get("latitude"),
"longitude": node.get("longitude"),
"last_heard": _format_timestamp(node.get("last_heard")),
"battery_level": node.get("battery_level") or node.get("batteryLevel"),
"voltage": node.get("voltage"),
"snr": node.get("snr"),
"firmware": node.get("firmware_version") or node.get("firmwareVersion") or "",
"hardware": node.get("hw_model") or node.get("hwModel") or "",
"uptime": node.get("uptime_seconds") or node.get("uptimeSeconds"),
"sources": node.get("_sources", []),
**health_data,
}
nodes.append(node_dict)
return nodes
@router.get("/nodes/{node_num}")
async def get_node_detail(node_num: int, request: Request):
"""Get detailed info for a specific node."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
raise HTTPException(status_code=404, detail="Data store not available")
# Find the node
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
raise HTTPException(status_code=500, detail="Failed to fetch nodes")
target_node = None
for node in raw_nodes:
n_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if n_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
n_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if n_num == node_num:
target_node = node
break
if not target_node:
raise HTTPException(status_code=404, detail=f"Node {node_num} not found")
# Get health data
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
"text_packet_count_24h": node_health.text_packet_count_24h,
"non_text_packets": node_health.non_text_packets,
"has_solar": node_health.has_solar,
}
# Get neighbors from edges
neighbors = []
try:
edges = data_store.get_all_edges()
for edge in edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
if from_num == node_num:
neighbors.append({
"node_num": to_num,
"snr": edge.get("snr"),
})
elif to_num == node_num:
neighbors.append({
"node_num": from_num,
"snr": edge.get("snr"),
})
except Exception:
pass
return {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": target_node.get("shortName") or target_node.get("short_name") or "",
"long_name": target_node.get("longName") or target_node.get("long_name") or "",
"role": target_node.get("role") or "",
"latitude": target_node.get("latitude"),
"longitude": target_node.get("longitude"),
"last_heard": _format_timestamp(target_node.get("last_heard")),
"battery_level": target_node.get("battery_level") or target_node.get("batteryLevel"),
"voltage": target_node.get("voltage"),
"snr": target_node.get("snr"),
"firmware": target_node.get("firmware_version") or target_node.get("firmwareVersion") or "",
"hardware": target_node.get("hw_model") or target_node.get("hwModel") or "",
"uptime": target_node.get("uptime_seconds") or target_node.get("uptimeSeconds"),
"sources": target_node.get("_sources", []),
"neighbors": neighbors,
**health_data,
}
@router.get("/regions")
async def get_regions(request: Request):
"""Get region summaries."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return []
regions = []
for region in health_engine.mesh_health.regions:
# Count online infrastructure
infra_online = 0
infra_total = 0
online_count = 0
for nid in region.node_ids:
node = health_engine.mesh_health.nodes.get(nid)
if node:
if node.is_online:
online_count += 1
if node.is_infrastructure:
infra_total += 1
if node.is_online:
infra_online += 1
regions.append({
"name": region.name,
"local_name": region.name, # Could be overridden by region_labels
"node_count": len(region.node_ids),
"infra_count": infra_total,
"infra_online": infra_online,
"online_count": online_count,
"score": round(region.score.composite, 1),
"tier": region.score.tier,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
})
return regions
@router.get("/sources")
async def get_sources(request: Request):
"""Get per-source health information."""
data_store = request.app.state.data_store
if not data_store:
return []
sources = []
try:
for name, source in data_store._sources.items():
source_info = {
"name": name,
"type": "meshview" if hasattr(source, "edges") else "meshmonitor",
"url": getattr(source, "url", ""),
"is_loaded": source.is_loaded,
"last_error": source.last_error,
"consecutive_errors": getattr(source, "consecutive_errors", 0),
"response_time_ms": getattr(source, "last_response_time_ms", None),
"tick_count": getattr(source, "tick_count", 0),
"node_count": len(source.nodes) if hasattr(source, "nodes") else 0,
}
sources.append(source_info)
except Exception:
pass
return sources
@router.get("/edges")
async def get_edges(request: Request):
"""Get neighbor/edge relationships."""
data_store = request.app.state.data_store
if not data_store:
return []
try:
raw_edges = data_store.get_all_edges()
except Exception:
return []
edges = []
for edge in raw_edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
snr = edge.get("snr")
# Derive quality from SNR
if snr is None:
quality = "unknown"
elif snr > 12:
quality = "excellent"
elif snr > 8:
quality = "good"
elif snr > 5:
quality = "fair"
elif snr > 3:
quality = "marginal"
else:
quality = "poor"
edges.append({
"from_node": from_num,
"to_node": to_num,
"snr": snr,
"quality": quality,
})
return edges

View file

@ -0,0 +1,63 @@
"""System status and control API routes."""
import time
from pathlib import Path
from fastapi import APIRouter, Request
from meshai import __version__
from meshai.commands.status import _start_time
router = APIRouter(tags=["system"])
@router.get("/status")
async def get_status(request: Request):
"""Get system status information."""
config = request.app.state.config
data_store = request.app.state.data_store
# Calculate uptime
uptime_seconds = time.time() - _start_time if _start_time else 0
# Connection info
conn = config.connection
if conn.type == "tcp":
connection_target = f"{conn.tcp_host}:{conn.tcp_port}"
else:
connection_target = conn.serial_port
# Count nodes and sources
node_count = 0
source_count = 0
connected = False
if data_store:
try:
nodes = data_store.get_all_nodes()
node_count = len(nodes) if nodes else 0
source_count = data_store.source_count
connected = any(s.is_loaded for s in data_store._sources.values())
except Exception:
pass
return {
"version": __version__,
"uptime_seconds": round(uptime_seconds, 1),
"bot_name": config.bot.name,
"connection_type": conn.type,
"connection_target": connection_target,
"connected": connected,
"node_count": node_count,
"source_count": source_count,
"env_feeds_enabled": request.app.state.env_store is not None,
"dashboard_port": config.dashboard.port,
}
@router.post("/restart")
async def restart_bot():
"""Signal the bot to restart."""
restart_file = Path("/tmp/meshai_restart")
restart_file.touch()
return {"restarting": True}

108
meshai/dashboard/server.py Normal file
View file

@ -0,0 +1,108 @@
"""FastAPI server for MeshAI dashboard."""
import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from .ws import DashboardBroadcaster, router as ws_router
if TYPE_CHECKING:
from ..main import MeshAI
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan context manager."""
logger.info("Dashboard starting up")
yield
logger.info("Dashboard shutting down")
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI(
title="MeshAI Dashboard",
description="Web dashboard for MeshAI mesh network monitoring",
version="0.1.0",
lifespan=lifespan,
)
# CORS middleware for Vite dev server
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:8080"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Import and include API routers
from .api.system_routes import router as system_router
from .api.config_routes import router as config_router
from .api.mesh_routes import router as mesh_router
from .api.env_routes import router as env_router
from .api.alert_routes import router as alert_router
app.include_router(system_router, prefix="/api")
app.include_router(config_router, prefix="/api")
app.include_router(mesh_router, prefix="/api")
app.include_router(env_router, prefix="/api")
app.include_router(alert_router, prefix="/api")
# WebSocket router (no prefix, path is /ws/live)
app.include_router(ws_router)
# Static files - mount LAST so /api routes take priority
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
return app
async def start_dashboard(meshai_instance: "MeshAI") -> DashboardBroadcaster:
"""Start the dashboard server in the MeshAI asyncio loop.
Args:
meshai_instance: The running MeshAI instance
Returns:
DashboardBroadcaster instance for pushing updates
"""
app = create_app()
# Populate app.state with MeshAI internals
app.state.config = meshai_instance.config
app.state.config_path = meshai_instance.config._config_path
app.state.data_store = meshai_instance.data_store
app.state.health_engine = meshai_instance.health_engine
app.state.alert_engine = getattr(meshai_instance, "alert_engine", None)
app.state.env_store = getattr(meshai_instance, "env_store", None)
app.state.subscription_manager = meshai_instance.subscription_manager
# Create broadcaster and attach to app state
broadcaster = DashboardBroadcaster()
app.state.broadcaster = broadcaster
# Configure uvicorn
config = uvicorn.Config(
app,
host=meshai_instance.config.dashboard.host,
port=meshai_instance.config.dashboard.port,
log_level="warning", # Don't spam meshai logs with access logs
)
server = uvicorn.Server(config)
# Start server as asyncio task (runs in same event loop as MeshAI)
asyncio.create_task(server.serve())
return broadcaster

View file

@ -0,0 +1,20 @@
<!DOCTYPE html>
<html>
<head>
<title>MeshAI Dashboard</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body style="background:#0a0e17;color:#e2e8f0;font-family:system-ui,-apple-system,sans-serif;display:flex;align-items:center;justify-content:center;height:100vh;margin:0">
<div style="text-align:center;max-width:600px;padding:2rem">
<h1 style="font-size:2.5rem;margin-bottom:1rem;font-weight:600">MeshAI Dashboard</h1>
<p style="color:#64748b;margin-bottom:2rem">Frontend build pending - API is live.</p>
<div style="display:flex;gap:1rem;justify-content:center;flex-wrap:wrap">
<a href="/api/status" style="color:#60a5fa;text-decoration:none;padding:0.5rem 1rem;border:1px solid #334155;border-radius:0.5rem">/api/status</a>
<a href="/api/health" style="color:#60a5fa;text-decoration:none;padding:0.5rem 1rem;border:1px solid #334155;border-radius:0.5rem">/api/health</a>
<a href="/api/nodes" style="color:#60a5fa;text-decoration:none;padding:0.5rem 1rem;border:1px solid #334155;border-radius:0.5rem">/api/nodes</a>
<a href="/api/config" style="color:#60a5fa;text-decoration:none;padding:0.5rem 1rem;border:1px solid #334155;border-radius:0.5rem">/api/config</a>
</div>
</div>
</body>
</html>

115
meshai/dashboard/ws.py Normal file
View file

@ -0,0 +1,115 @@
"""WebSocket support for real-time dashboard updates."""
import logging
from typing import Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
router = APIRouter()
class DashboardBroadcaster:
"""Manages active WebSocket connections for real-time updates."""
def __init__(self):
self._connections: Set[WebSocket] = set()
async def connect(self, websocket: WebSocket) -> None:
"""Accept and register a new WebSocket connection."""
await websocket.accept()
self._connections.add(websocket)
logger.debug(f"WebSocket connected, total: {len(self._connections)}")
def disconnect(self, websocket: WebSocket) -> None:
"""Remove a WebSocket connection."""
self._connections.discard(websocket)
logger.debug(f"WebSocket disconnected, total: {len(self._connections)}")
async def broadcast(self, msg_type: str, data: dict) -> None:
"""Broadcast a message to all connected clients.
Args:
msg_type: Message type (e.g., "health_update", "alert_fired")
data: Message payload
"""
if not self._connections:
return
message = {"type": msg_type, "data": data}
dead_connections = set()
for websocket in self._connections:
try:
await websocket.send_json(message)
except Exception as e:
logger.debug(f"WebSocket send failed: {e}")
dead_connections.add(websocket)
# Remove dead connections
for ws in dead_connections:
self._connections.discard(ws)
@property
def connection_count(self) -> int:
"""Get number of active connections."""
return len(self._connections)
def _serialize_health(mesh_health) -> dict:
"""Serialize MeshHealth for WebSocket transmission."""
if not mesh_health:
return {"score": 0, "tier": "Unknown", "message": "No data"}
score = mesh_health.score
return {
"score": round(score.composite, 1),
"tier": score.tier,
"pillars": {
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
},
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"total_nodes": mesh_health.total_nodes,
"total_regions": mesh_health.total_regions,
"last_computed": mesh_health.last_computed,
}
@router.websocket("/ws/live")
async def ws_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time updates."""
# Get broadcaster from app state
app_state = websocket.app.state
broadcaster = getattr(app_state, "broadcaster", None)
if not broadcaster:
await websocket.close(code=1011, reason="Broadcaster not initialized")
return
await broadcaster.connect(websocket)
try:
# Send initial state snapshot on connect
health_engine = getattr(app_state, "health_engine", None)
if health_engine and health_engine.mesh_health:
await websocket.send_json({
"type": "health_update",
"data": _serialize_health(health_engine.mesh_health)
})
# Keep connection alive, receive client keepalive pings
while True:
await websocket.receive_text()
except WebSocketDisconnect:
broadcaster.disconnect(websocket)
except Exception as e:
logger.debug(f"WebSocket error: {e}")
broadcaster.disconnect(websocket)

View file

@ -51,6 +51,7 @@ class MeshAI:
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
self._last_health_compute: float = 0.0
self.broadcaster = None # Dashboard WebSocket broadcaster
async def start(self) -> None:
"""Start the bot."""
@ -94,12 +95,37 @@ class MeshAI:
self.health_engine.compute(self.data_store)
self._last_health_compute = time.time()
# Broadcast health update to dashboard
if self.broadcaster and self.health_engine.mesh_health:
try:
mh = self.health_engine.mesh_health
health_dict = {
"score": round(mh.score.composite, 1),
"tier": mh.score.tier,
"total_nodes": mh.total_nodes,
"total_regions": mh.total_regions,
"infra_online": mh.score.infra_online,
"infra_total": mh.score.infra_total,
"last_computed": mh.last_computed,
}
await self.broadcaster.broadcast("health_update", health_dict)
except Exception as e:
logger.debug("Dashboard broadcast error: %s", e)
# Check for alertable conditions
if self.alert_engine:
alerts = self.alert_engine.check()
if alerts:
await self._dispatch_alerts(alerts)
# Broadcast alerts to dashboard
if self.broadcaster:
for alert in alerts:
try:
await self.broadcaster.broadcast("alert_fired", alert)
except Exception:
pass
# Check scheduled subscriptions (every 60 seconds)
if self.subscription_manager and self.mesh_reporter:
if time.time() - self._last_sub_check >= 60:
@ -345,6 +371,18 @@ class MeshAI:
# Responder
self.responder = Responder(self.config.response, self.connector)
# Dashboard
if hasattr(self.config, 'dashboard') and self.config.dashboard.enabled:
try:
from .dashboard.server import start_dashboard
self.broadcaster = await start_dashboard(self)
logger.info("Dashboard started on port %d", self.config.dashboard.port)
except Exception as e:
logger.warning("Dashboard failed to start: %s", e)
self.broadcaster = None
else:
self.broadcaster = None
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:

View file

@ -36,6 +36,8 @@ dependencies = [
"google-genai>=1.0.0",
"rich>=13.0.0",
"httpx>=0.25.0",
"fastapi>=0.110.0",
"uvicorn[standard]>=0.27.0",
]
[project.optional-dependencies]

View file

@ -9,3 +9,5 @@ httpx>=0.25.0
fastembed>=0.3.0
sqlite-vec>=0.1.0
numpy
fastapi>=0.110.0
uvicorn[standard]>=0.27.0