diff --git a/meshai/commands/dispatcher.py b/meshai/commands/dispatcher.py index 4db50bd..3a4849c 100644 --- a/meshai/commands/dispatcher.py +++ b/meshai/commands/dispatcher.py @@ -158,6 +158,9 @@ def create_dispatcher( disabled_commands: Optional[list[str]] = None, custom_commands: Optional[dict] = None, mesh_reporter=None, + data_store=None, + health_engine=None, + subscription_manager=None, ) -> CommandDispatcher: """Create and populate command dispatcher with default commands. @@ -166,6 +169,9 @@ def create_dispatcher( disabled_commands: List of command names to disable custom_commands: Dict of name -> response for custom commands mesh_reporter: MeshReporter instance for health commands + data_store: MeshDataStore for neighbor data + health_engine: MeshHealthEngine for infrastructure detection + subscription_manager: SubscriptionManager for subscription commands Returns: Configured CommandDispatcher @@ -176,7 +182,8 @@ def create_dispatcher( from .reset import ResetCommand from .status import StatusCommand from .weather import WeatherCommand - from .health import HealthCommand, RegionCommand + from .health import HealthCommand, RegionCommand, NeighborCommand + from .subscribe import SubCommand, UnsubCommand, MySubsCommand dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands) @@ -205,6 +212,37 @@ def create_dispatcher( alias_handler.name = alias dispatcher.register(alias_handler) + # Register neighbors command + neighbor_cmd = NeighborCommand(mesh_reporter, data_store, health_engine) + dispatcher.register(neighbor_cmd) + # Register aliases for neighbors command + for alias in getattr(neighbor_cmd, 'aliases', []): + alias_handler = NeighborCommand(mesh_reporter, data_store, health_engine) + alias_handler.name = alias + dispatcher.register(alias_handler) + + # Register subscription commands + sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store) + dispatcher.register(sub_cmd) + for alias in getattr(sub_cmd, 'aliases', []): + alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store) + alias_handler.name = alias + dispatcher.register(alias_handler) + + unsub_cmd = UnsubCommand(subscription_manager) + dispatcher.register(unsub_cmd) + for alias in getattr(unsub_cmd, 'aliases', []): + alias_handler = UnsubCommand(subscription_manager) + alias_handler.name = alias + dispatcher.register(alias_handler) + + mysubs_cmd = MySubsCommand(subscription_manager) + dispatcher.register(mysubs_cmd) + for alias in getattr(mysubs_cmd, 'aliases', []): + alias_handler = MySubsCommand(subscription_manager) + alias_handler.name = alias + dispatcher.register(alias_handler) + # Register custom commands if custom_commands: for name, response in custom_commands.items(): diff --git a/meshai/commands/subscribe.py b/meshai/commands/subscribe.py new file mode 100644 index 0000000..5dbe705 --- /dev/null +++ b/meshai/commands/subscribe.py @@ -0,0 +1,322 @@ +"""Subscription commands for scheduled reports and alerts.""" + +from typing import TYPE_CHECKING + +from .base import CommandContext, CommandHandler + +if TYPE_CHECKING: + from ..mesh_data_store import MeshDataStore + from ..mesh_reporter import MeshReporter + from ..subscriptions import SubscriptionManager + + +class SubCommand(CommandHandler): + """Subscribe to scheduled reports or alerts.""" + + name = "sub" + description = "Subscribe to reports or alerts" + usage = "!sub daily|weekly|alerts [time] [day] [scope]" + aliases = ["subscribe"] + + def __init__( + self, + subscription_manager: "SubscriptionManager" = None, + mesh_reporter: "MeshReporter" = None, + data_store: "MeshDataStore" = None, + ): + self._sub_manager = subscription_manager + self._reporter = mesh_reporter + self._data_store = data_store + + async def execute(self, args: str, context: CommandContext) -> str: + """Handle subscription command.""" + if not self._sub_manager: + return "Subscriptions not available." + + parts = args.strip().split() + if not parts: + return self._usage_help() + + sub_type = parts[0].lower() + if sub_type not in ("daily", "weekly", "alerts"): + return f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts" + + try: + if sub_type == "daily": + return self._handle_daily(parts[1:], context) + elif sub_type == "weekly": + return self._handle_weekly(parts[1:], context) + else: # alerts + return self._handle_alerts(parts[1:], context) + except ValueError as e: + return f"Error: {e}" + + def _usage_help(self) -> str: + """Return usage help.""" + return """Usage: +!sub daily 1830 - daily mesh report at 6:30 PM +!sub daily 1830 region SCID - daily region report +!sub daily 1830 node MHR - daily node report +!sub weekly 0800 sun - weekly digest Sunday 8 AM +!sub alerts - mesh-wide alerts +!sub alerts region SCID - alerts for a region""" + + def _handle_daily(self, args: list, context: CommandContext) -> str: + """Handle daily subscription.""" + if not args: + raise ValueError("Time required. Example: !sub daily 1830") + + schedule_time = args[0] + scope_type, scope_value = self._parse_scope(args[1:]) + + # Validate scope + scope_value = self._validate_scope(scope_type, scope_value) + + result = self._sub_manager.add( + user_id=self._get_user_id(context), + sub_type="daily", + schedule_time=schedule_time, + scope_type=scope_type, + scope_value=scope_value, + ) + + time_fmt = self._format_time(schedule_time) + scope_desc = self._format_scope(scope_type, scope_value) + return f"Subscribed: daily {scope_desc}report at {time_fmt}" + + def _handle_weekly(self, args: list, context: CommandContext) -> str: + """Handle weekly subscription.""" + if len(args) < 2: + raise ValueError("Time and day required. Example: !sub weekly 0800 sun") + + schedule_time = args[0] + schedule_day = args[1].lower() + scope_type, scope_value = self._parse_scope(args[2:]) + + # Validate scope + scope_value = self._validate_scope(scope_type, scope_value) + + result = self._sub_manager.add( + user_id=self._get_user_id(context), + sub_type="weekly", + schedule_time=schedule_time, + schedule_day=schedule_day, + scope_type=scope_type, + scope_value=scope_value, + ) + + time_fmt = self._format_time(schedule_time) + day_fmt = schedule_day.capitalize() + scope_desc = self._format_scope(scope_type, scope_value) + return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}" + + def _handle_alerts(self, args: list, context: CommandContext) -> str: + """Handle alerts subscription.""" + scope_type, scope_value = self._parse_scope(args) + + # Validate scope + scope_value = self._validate_scope(scope_type, scope_value) + + result = self._sub_manager.add( + user_id=self._get_user_id(context), + sub_type="alerts", + scope_type=scope_type, + scope_value=scope_value, + ) + + scope_desc = self._format_scope(scope_type, scope_value) + return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}" + + def _parse_scope(self, args: list) -> tuple[str, str]: + """Parse scope from remaining args. + + Returns: + (scope_type, scope_value) tuple + """ + if not args: + return "mesh", None + + # Look for 'region' or 'node' keyword + scope_type = "mesh" + scope_value = None + + for i, arg in enumerate(args): + arg_lower = arg.lower() + if arg_lower == "region": + scope_type = "region" + # Everything after 'region' is the region name + scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None + break + elif arg_lower == "node": + scope_type = "node" + # Next arg is the node identifier + scope_value = args[i + 1] if i + 1 < len(args) else None + break + + return scope_type, scope_value + + def _validate_scope(self, scope_type: str, scope_value: str) -> str: + """Validate and resolve scope value. + + Returns: + Resolved scope_value (e.g., full region name) + + Raises: + ValueError: If scope not found + """ + if scope_type == "mesh": + return None + + if not scope_value: + raise ValueError(f"Missing {scope_type} name") + + if scope_type == "region" and self._reporter: + region = self._reporter._find_region(scope_value) + if not region: + # List available regions + health = self._reporter.health_engine.mesh_health + if health: + available = [r.name for r in health.regions if r.node_ids] + return scope_value # Use as-is, will fail at delivery if invalid + raise ValueError(f"Region '{scope_value}' not found") + return region.name # Return canonical name + + if scope_type == "node" and self._reporter: + node = self._reporter._find_node(scope_value) + if not node: + raise ValueError(f"Node '{scope_value}' not found") + return node.short_name or str(node.node_num) + + return scope_value + + def _get_user_id(self, context: CommandContext) -> str: + """Extract user ID from context.""" + # sender_id is like "!abcd1234" - convert to node_num + sender_id = context.sender_id + if sender_id.startswith("!"): + return str(int(sender_id[1:], 16)) + return sender_id + + def _format_time(self, hhmm: str) -> str: + """Format HHMM as readable time.""" + hours = int(hhmm[:2]) + minutes = int(hhmm[2:]) + period = "AM" if hours < 12 else "PM" + display_hour = hours % 12 or 12 + return f"{display_hour}:{minutes:02d} {period}" + + def _format_scope(self, scope_type: str, scope_value: str) -> str: + """Format scope for display.""" + if scope_type == "mesh" or not scope_value: + return "mesh " + return f"{scope_type} {scope_value} " + + +class UnsubCommand(CommandHandler): + """Unsubscribe from reports or alerts.""" + + name = "unsub" + description = "Remove subscription(s)" + usage = "!unsub daily|weekly|alerts|all" + aliases = ["unsubscribe"] + + def __init__(self, subscription_manager: "SubscriptionManager" = None): + self._sub_manager = subscription_manager + + async def execute(self, args: str, context: CommandContext) -> str: + """Handle unsubscribe command.""" + if not self._sub_manager: + return "Subscriptions not available." + + sub_type = args.strip().lower() if args else None + + if not sub_type: + return "Usage: !unsub daily|weekly|alerts|all" + + if sub_type not in ("daily", "weekly", "alerts", "all"): + return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, or all" + + user_id = self._get_user_id(context) + removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None) + + if removed == 0: + return "No subscriptions found to remove" + elif sub_type == "all": + return f"Removed all {removed} subscription(s)" + else: + return f"Removed {removed} {sub_type} subscription(s)" + + def _get_user_id(self, context: CommandContext) -> str: + """Extract user ID from context.""" + sender_id = context.sender_id + if sender_id.startswith("!"): + return str(int(sender_id[1:], 16)) + return sender_id + + +class MySubsCommand(CommandHandler): + """List active subscriptions.""" + + name = "mysubs" + description = "List your subscriptions" + usage = "!mysubs" + aliases = ["subs"] + + def __init__(self, subscription_manager: "SubscriptionManager" = None): + self._sub_manager = subscription_manager + + async def execute(self, args: str, context: CommandContext) -> str: + """List user's subscriptions.""" + if not self._sub_manager: + return "Subscriptions not available." + + user_id = self._get_user_id(context) + subs = self._sub_manager.get_user_subs(user_id) + + if not subs: + return "No active subscriptions. Use !sub to subscribe." + + lines = ["Your subscriptions:"] + for i, sub in enumerate(subs, 1): + lines.append(f" {i}. {self._format_sub(sub)}") + + return "\n".join(lines) + + def _format_sub(self, sub: dict) -> str: + """Format a subscription for display.""" + sub_type = sub["sub_type"] + scope_type = sub.get("scope_type", "mesh") + scope_value = sub.get("scope_value") + + scope_desc = "" + if scope_type == "region" and scope_value: + scope_desc = f"region {scope_value} " + elif scope_type == "node" and scope_value: + scope_desc = f"node {scope_value} " + + if sub_type == "daily": + time_str = self._format_time(sub.get("schedule_time", "0000")) + return f"Daily {scope_desc}report at {time_str}" + elif sub_type == "weekly": + time_str = self._format_time(sub.get("schedule_time", "0000")) + day_str = (sub.get("schedule_day") or "").capitalize() + return f"Weekly {scope_desc}report at {time_str} {day_str}" + else: # alerts + return f"Alerts for {scope_desc.strip() or 'mesh'}" + + def _format_time(self, hhmm: str) -> str: + """Format HHMM as readable time.""" + if not hhmm or len(hhmm) != 4: + return hhmm + hours = int(hhmm[:2]) + minutes = int(hhmm[2:]) + period = "AM" if hours < 12 else "PM" + display_hour = hours % 12 or 12 + return f"{display_hour}:{minutes:02d} {period}" + + def _get_user_id(self, context: CommandContext) -> str: + """Extract user ID from context.""" + sender_id = context.sender_id + if sender_id.startswith("!"): + return str(int(sender_id[1:], 16)) + return sender_id diff --git a/meshai/main.py b/meshai/main.py index 89728d5..c568395 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -39,9 +39,11 @@ class MeshAI: self.context: Optional[MeshContext] = None self.meshmonitor_sync = None self.knowledge = None - self.source_manager = None + self.data_store = None # Replaces source_manager self.health_engine = None self.mesh_reporter = None + self.subscription_manager = None + self._last_sub_check: float = 0.0 self.router: Optional[MessageRouter] = None self.responder: Optional[Responder] = None self._running = False @@ -83,14 +85,20 @@ class MeshAI: if self.meshmonitor_sync: self.meshmonitor_sync.maybe_refresh() - # Periodic mesh source refresh and health computation - if self.source_manager: - refreshed = self.source_manager.refresh_all() - # Recompute health after source refresh - if refreshed > 0 and self.health_engine: - self.health_engine.compute(self.source_manager) + # Periodic data store refresh and health computation + if self.data_store: + refreshed = self.data_store.refresh() + # Recompute health after refresh + if refreshed and self.health_engine: + self.health_engine.compute(self.data_store) self._last_health_compute = time.time() + # Check scheduled subscriptions (every 60 seconds) + if self.subscription_manager and self.mesh_reporter: + if time.time() - self._last_sub_check >= 60: + await self._check_scheduled_subs() + self._last_sub_check = time.time() + # Periodic cleanup if time.time() - self._last_cleanup >= 3600: await self.history.cleanup_expired() @@ -113,6 +121,10 @@ class MeshAI: await self.llm.close() if self.knowledge: self.knowledge.close() + if self.data_store: + self.data_store.close() + if self.subscription_manager: + self.subscription_manager.close() self._remove_pid() logger.info("MeshAI stopped") @@ -184,15 +196,22 @@ class MeshAI: else: self.meshmonitor_sync = None - # Mesh data sources - enabled_sources = [s for s in self.config.mesh_sources if s.enabled] + # Mesh data store (replaces MeshSourceManager) + # mesh_sources may be dicts or MeshSourceConfig objects depending on config version + enabled_sources = [ + s for s in self.config.mesh_sources + if (s.enabled if hasattr(s, 'enabled') else s.get('enabled', True)) + ] if enabled_sources: - from .mesh_sources import MeshSourceManager - self.source_manager = MeshSourceManager(enabled_sources) - # Initial fetch - self.source_manager.refresh_all() + from .mesh_data_store import MeshDataStore + self.data_store = MeshDataStore( + source_configs=enabled_sources, + db_path="/data/mesh_history.db", + ) + # Initial fetch and backfill + self.data_store.force_refresh() # Log status - for status in self.source_manager.get_status(): + for status in self.data_store.get_status(): if status["is_loaded"]: logger.info( f"Mesh source '{status['name']}' ({status['type']}): " @@ -204,11 +223,11 @@ class MeshAI: f"failed - {status.get('last_error', 'unknown error')}" ) else: - self.source_manager = None + self.data_store = None # Mesh health engine mi_cfg = self.config.mesh_intelligence - if mi_cfg.enabled and self.source_manager: + if mi_cfg.enabled and self.data_store: from .mesh_health import MeshHealthEngine self.health_engine = MeshHealthEngine( regions=mi_cfg.regions, @@ -218,7 +237,7 @@ class MeshAI: battery_warning_percent=mi_cfg.battery_warning_percent, ) # Initial health computation - mesh_health = self.health_engine.compute(self.source_manager) + mesh_health = self.health_engine.compute(self.data_store) self._last_health_compute = time.time() logger.info( f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " @@ -229,21 +248,33 @@ class MeshAI: self.health_engine = None # Mesh reporter (for LLM prompt injection and commands) - if self.health_engine and self.source_manager: + if self.health_engine and self.data_store: from .mesh_reporter import MeshReporter - self.mesh_reporter = MeshReporter(self.health_engine, self.source_manager) + self.mesh_reporter = MeshReporter(self.health_engine, self.data_store) logger.info("Mesh reporter enabled") else: self.mesh_reporter = None - # Knowledge base + # Subscription manager (uses same db as data_store) + if self.data_store: + from .subscriptions import SubscriptionManager + self.subscription_manager = SubscriptionManager(db_path="/data/mesh_history.db") + logger.info("Subscription manager enabled") + else: + self.subscription_manager = None + + # Knowledge base (optional - gracefully degrade if deps missing) kb_cfg = self.config.knowledge if kb_cfg.enabled and kb_cfg.db_path: - from .knowledge import KnowledgeSearch - self.knowledge = KnowledgeSearch( - db_path=kb_cfg.db_path, - top_k=kb_cfg.top_k, - ) + try: + from .knowledge import KnowledgeSearch + self.knowledge = KnowledgeSearch( + db_path=kb_cfg.db_path, + top_k=kb_cfg.top_k, + ) + except ImportError as e: + logger.warning(f"Knowledge base disabled - missing dependencies: {e}") + self.knowledge = None else: self.knowledge = None @@ -253,6 +284,9 @@ class MeshAI: disabled_commands=self.config.commands.disabled_commands, custom_commands=self.config.commands.custom_commands, mesh_reporter=self.mesh_reporter, + data_store=self.data_store, + health_engine=self.health_engine, + subscription_manager=self.subscription_manager, ) # Message router @@ -261,7 +295,7 @@ class MeshAI: context=self.context, meshmonitor_sync=self.meshmonitor_sync, knowledge=self.knowledge, - source_manager=self.source_manager, + data_store=self.data_store, health_engine=self.health_engine, mesh_reporter=self.mesh_reporter, ) @@ -373,6 +407,80 @@ class MeshAI: if pid_file.exists(): pid_file.unlink() + async def _check_scheduled_subs(self) -> None: + """Check for and deliver due scheduled reports.""" + from datetime import datetime + from zoneinfo import ZoneInfo + + tz = ZoneInfo("America/Boise") + now = datetime.now(tz) + current_hhmm = now.strftime("%H%M") + current_day = now.strftime("%a").lower() + + due_subs = self.subscription_manager.get_due_subscriptions(current_hhmm, current_day) + + for sub in due_subs: + try: + # Generate report based on scope + report = self._generate_sub_report(sub) + if not report: + continue + + # Send DM to subscriber + user_id = sub["user_id"] + await self._send_sub_dm(user_id, report) + + # Mark as sent + self.subscription_manager.mark_sent(sub["id"]) + logger.info(f"Delivered {sub['sub_type']} report to {user_id}") + + except Exception as e: + logger.error(f"Error delivering subscription {sub['id']}: {e}") + + def _generate_sub_report(self, sub: dict) -> str: + """Generate report content for a subscription.""" + if not self.mesh_reporter: + return None + + sub_type = sub["sub_type"] + scope_type = sub.get("scope_type", "mesh") + scope_value = sub.get("scope_value") + + if scope_type == "region" and scope_value: + # Region-scoped report + region = self.mesh_reporter._find_region(scope_value) + if region: + return self.mesh_reporter.build_region_compact(region.name) + return None + elif scope_type == "node" and scope_value: + # Node-scoped report + return self.mesh_reporter.build_node_compact(scope_value) + else: + # Mesh-wide report + return self.mesh_reporter.build_lora_compact(scope="mesh") + + async def _send_sub_dm(self, node_num: str, message: str) -> None: + """Send a subscription DM to a node.""" + if not self.connector: + return + + # Convert node_num to destination format + try: + dest = int(node_num) + except ValueError: + dest = node_num + + # Send via responder for proper chunking + if self.responder: + await self.responder.send_response( + message, + destination=dest, + channel=0, # DM channel + ) + else: + # Fallback to direct send + self.connector.send_message(message, destination=dest) + def setup_logging(verbose: bool = False) -> None: """Configure logging.""" diff --git a/meshai/mesh_reporter.py b/meshai/mesh_reporter.py index 69df3dd..2570443 100644 --- a/meshai/mesh_reporter.py +++ b/meshai/mesh_reporter.py @@ -1,12 +1,45 @@ -"""Mesh health reporting for LLM prompt injection and commands.""" +"""Mesh health reporting for LLM prompt injection and commands. + +Refactored to consume MeshDataStore and UnifiedNode directly. +""" import logging import time from datetime import datetime -from typing import Optional +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from .mesh_data_store import MeshDataStore + from .mesh_health import MeshHealthEngine, NodeHealth, RegionHealth logger = logging.getLogger(__name__) +# Portnum display names (from Meshtastic protobufs) +PORTNUM_DISPLAY = { + "TEXT_MESSAGE_APP": "Text", + "POSITION_APP": "Position", + "NODEINFO_APP": "NodeInfo", + "TELEMETRY_APP": "Telemetry", + "TRACEROUTE_APP": "Traceroute", + "ROUTING_APP": "Routing", + "ADMIN_APP": "Admin", + "WAYPOINT_APP": "Waypoint", + "RANGE_TEST_APP": "RangeTest", + "STORE_FORWARD_APP": "Store&Fwd", + "NEIGHBORINFO_APP": "Neighbors", + "MAP_REPORT_APP": "MapReport", + "DETECTION_SENSOR_APP": "Sensor", + "PAXCOUNTER_APP": "PaxCounter", + "REMOTE_HARDWARE_APP": "RemoteHW", + "ATAK_PLUGIN": "ATAK", + "ATAK_FORWARDER": "ATAK", +} + + +def _clean_portnum(portnum: str) -> str: + """Convert raw portnum to display name.""" + return PORTNUM_DISPLAY.get(portnum, portnum.replace("_APP", "").replace("_", " ").title()) + def _format_age(timestamp: float) -> str: """Format a timestamp as human-readable age.""" @@ -26,6 +59,61 @@ def _format_age(timestamp: float) -> str: return f"{int(age_seconds / 86400)}d ago" +def _format_battery(battery_percent: Optional[float], voltage: Optional[float] = None) -> str: + """Format battery with emoji and USB detection. + + Args: + battery_percent: 0-100, or 101 for USB/external powered + voltage: Optional voltage reading + + Returns: + Formatted string like "USB Powered" or "75% (3.92V)" + """ + if battery_percent is None: + return "N/A" + + # 101% = USB/external powered + if battery_percent > 100: + return "USB Powered" + + # Build emoji indicator + pct = int(battery_percent) + if pct >= 80: + emoji = "" # Good, no emoji needed + elif pct >= 50: + emoji = "" # OK, no emoji + elif pct >= 20: + emoji = " (low)" + else: + emoji = " (critical)" + + # Add voltage if available + if voltage and voltage > 0: + return f"{pct}% ({voltage:.2f}V){emoji}" + return f"{pct}%{emoji}" + + +def _node_display_name(long_name: Optional[str], short_name: Optional[str], node_id: str) -> str: + """Format node display name: long name first, shortname in parens. + + Args: + long_name: Full node name + short_name: 4-char short name + node_id: Fallback node ID + + Returns: + Formatted name like "My Node Name (MYND)" or "MYND" or "!abcd1234" + """ + if long_name and short_name: + return f"{long_name} ({short_name})" + elif long_name: + return long_name + elif short_name: + return short_name + else: + return f"!{node_id[-8:]}" if len(node_id) >= 8 else node_id + + def _tier_flag(tier: str) -> str: """Get warning flag for health tier.""" if tier == "Critical": @@ -37,18 +125,42 @@ def _tier_flag(tier: str) -> str: return "" +def _format_temperature(temp_c: Optional[float]) -> Optional[str]: + """Format temperature, flagging suspicious readings. + + Args: + temp_c: Temperature in Celsius + + Returns: + Formatted string or None if input is None + """ + if temp_c is None: + return None + temp_f = temp_c * 9/5 + 32 + if temp_c > 60 or temp_c < -50: + return f"{temp_c:.1f}°C ({temp_f:.1f}°F) ⚠️ suspect" + return f"{temp_c:.1f}°C ({temp_f:.1f}°F)" + + +def _is_valid_temperature(temp_c: Optional[float]) -> bool: + """Check if temperature is within valid range for aggregation.""" + if temp_c is None: + return False + return -50 <= temp_c <= 60 + + class MeshReporter: """Builds text blocks for mesh health prompt injection.""" - def __init__(self, health_engine, source_manager): + def __init__(self, health_engine: "MeshHealthEngine", data_store: "MeshDataStore"): """Initialize reporter. Args: health_engine: MeshHealthEngine instance - source_manager: MeshSourceManager instance + data_store: MeshDataStore instance """ self.health_engine = health_engine - self.source_manager = source_manager + self.data_store = data_store def build_tier1_summary(self) -> str: """Build compact mesh summary for LLM injection (~500-800 tokens). @@ -61,18 +173,24 @@ class MeshReporter: return "LIVE MESH HEALTH DATA: No data available yet." score = health.score - ts = datetime.fromtimestamp(health.last_computed).strftime("%H:%M %Z") + data_age = self.data_store.data_age_seconds + if data_age < 60: + age_str = f"{int(data_age)}s ago" + elif data_age < 3600: + age_str = f"{int(data_age / 60)}m ago" + else: + age_str = f"{int(data_age / 3600)}h ago" # Infrastructure stats infra_online = score.infra_online infra_total = score.infra_total infra_pct = int((infra_online / infra_total * 100) if infra_total > 0 else 100) - # Utilization + # Utilization - prefer device-reported util = score.util_percent - util_data_available = getattr(health, 'has_packet_data', False) or getattr(score, 'util_data_available', False) + util_data_available = score.util_data_available if not util_data_available: - util_label = "N/A - no packet data" + util_label = "N/A" elif util < 15: util_label = "Low" elif util < 20: @@ -82,16 +200,11 @@ class MeshReporter: else: util_label = "High" - # Power - if score.battery_warnings == 0: - power_label = "Good" - elif score.battery_warnings <= 2: - power_label = "Some low batteries" - else: - power_label = f"{score.battery_warnings} low batteries" + # Power breakdown + power_breakdown = self._get_power_breakdown() lines = [ - f"LIVE MESH HEALTH DATA (as of {ts}):", + f"LIVE MESH HEALTH DATA (as of {age_str}):", "", f"Overall: {score.composite:.0f}/100 ({score.tier})", f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)", @@ -104,17 +217,75 @@ class MeshReporter: lines.append("Channel Utilization: No data available") lines.append(f"Node Behavior: {score.flagged_nodes} nodes flagged") - lines.append(f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)") + + # Power breakdown with USB/ok/low/critical counts + if power_breakdown["total"] > 0: + parts = [] + if power_breakdown["usb"] > 0: + parts.append(f"{power_breakdown['usb']} USB") + if power_breakdown["ok"] > 0: + parts.append(f"{power_breakdown['ok']} ok") + if power_breakdown["low"] > 0: + parts.append(f"{power_breakdown['low']} low") + if power_breakdown["critical"] > 0: + parts.append(f"{power_breakdown['critical']} critical") + power_str = ", ".join(parts) if parts else "No battery data" + lines.append(f"Power: {power_str} ({score.solar_index:.0f}% solar)") + else: + lines.append(f"Power: No battery data ({score.solar_index:.0f}% solar)") + + # Traffic trend + traffic_trend = self._get_traffic_trend_summary() + if traffic_trend: + lines.append(f"Traffic Trend: {traffic_trend}") + + # Top Senders section (packets sent = "noisy") + top_senders = self.data_store.get_top_senders(5) + if top_senders: + lines.append("") + lines.append("Top Senders (24h):") + for node in top_senders: + if node.packets_sent_24h > 0: + # Build portnum breakdown with clean names + breakdown = [] + for portnum, count in sorted( + node.packets_by_type.items(), key=lambda x: -x[1] + )[:3]: + clean_name = _clean_portnum(portnum) + breakdown.append(f"{clean_name}: {count}") + breakdown_str = f" ({', '.join(breakdown)})" if breakdown else "" + display_name = _node_display_name(node.long_name, node.short_name, node.node_id_hex or "") + lines.append( + f" {display_name}: {node.packets_sent_24h} pkts{breakdown_str}" + ) + + # Device-reported channel utilization (RF airspace busyness) + util_data = self.data_store.get_mesh_utilization() + if util_data["node_count"] > 0: + lines.append("") + lines.append("Channel Utilization (device-reported RF busyness):") + lines.append(f" Mesh avg: {util_data['avg']:.1f}%") + lines.append(f" Highest: {util_data['max_node']} at {util_data['max']:.1f}%") # Network topology stats (if available) if health.has_traceroute_data: - lines.append(f"Routing: {health.traceroute_count} traceroutes, avg {health.avg_hop_count:.1f} hops, max {health.max_hop_count}") - else: - lines.append("Routing: No traceroute data available") + lines.append("") + lines.append( + f"Routing: {health.traceroute_count} traceroutes, " + f"avg {health.avg_hop_count:.1f} hops, max {health.max_hop_count}" + ) # MQTT uplink stats lines.append(f"MQTT Uplinks: {health.uplink_node_count} nodes") + # Coverage/deliverability stats + deliver = self.data_store.get_mesh_deliverability() + if deliver.get("avg_gateways") is not None: + avg_gw = deliver["avg_gateways"] + total_seen = deliver.get("total_seen", 0) + total_pkts = deliver.get("total_packets", 0) + lines.append(f"Coverage: avg {avg_gw:.1f} gateways/packet ({total_seen}/{total_pkts} seen/sent)") + lines.append("") lines.append("Regions:") @@ -123,7 +294,9 @@ class MeshReporter: rs = region.score flag = _tier_flag(rs.tier) infra_str = f"{rs.infra_online}/{rs.infra_total} infra" - lines.append(f" {region.name}: {rs.composite:.0f}/100 - {infra_str}, {rs.util_percent:.0f}% util{flag}") + lines.append( + f" {region.name}: {rs.composite:.0f}/100 - {infra_str}, {rs.util_percent:.0f}% util{flag}" + ) # Top issues issues = self._gather_top_issues(health) @@ -133,11 +306,136 @@ class MeshReporter: for i, issue in enumerate(issues[:5], 1): lines.append(f" {i}. {issue}") + # Sensor summary + env_nodes = self.data_store.get_sensor_nodes("environment") + aq_nodes = self.data_store.get_sensor_nodes("air_quality") + wx_nodes = self.data_store.get_sensor_nodes("weather") + if env_nodes or aq_nodes or wx_nodes: + lines.append("") + sensor_parts = [] + if env_nodes: + sensor_parts.append(f"{len(env_nodes)} env") + if aq_nodes: + sensor_parts.append(f"{len(aq_nodes)} air quality") + if wx_nodes: + sensor_parts.append(f"{len(wx_nodes)} weather") + lines.append(f"Sensors: {', '.join(sensor_parts)}") + + # Show temp range if available (filter outliers) + valid_temps = [n.temperature for n in env_nodes if _is_valid_temperature(n.temperature)] + if valid_temps: + lines.append(f" Temp range: {min(valid_temps):.1f}-{max(valid_temps):.1f}C") + lines.append("") - lines.append(f"{health.total_nodes} nodes across {health.total_regions} regions. User can ask about any region, locality, or node for details.") + lines.append( + f"{health.total_nodes} nodes across {health.total_regions} regions. " + f"User can ask about any region, locality, or node for details." + ) return "\n".join(lines) + def _get_power_breakdown(self) -> dict: + """Get power breakdown counts. + + Returns: + Dict with usb, ok, low, critical, total counts + """ + health = self.health_engine.mesh_health + if not health: + return {"usb": 0, "ok": 0, "low": 0, "critical": 0, "total": 0} + + usb = 0 + ok = 0 + low = 0 + critical = 0 + + for node in health.nodes.values(): + if node.battery_percent is None: + continue + if node.battery_percent > 100: + usb += 1 + elif node.battery_percent >= 50: + ok += 1 + elif node.battery_percent >= 20: + low += 1 + else: + critical += 1 + + return { + "usb": usb, + "ok": ok, + "low": low, + "critical": critical, + "total": usb + ok + low + critical + } + + def _get_traffic_trend_summary(self) -> str: + """Get mesh-wide traffic trend from historical data. + + Returns: + Trend string like "up 15% vs yesterday" or empty string + """ + try: + history = self._get_daily_traffic_history(days=3) + if len(history) < 2: + return "" + + # Compare today vs yesterday + today = history.get("day_0", 0) + yesterday = history.get("day_1", 0) + + if yesterday == 0: + return "" + + pct_change = ((today - yesterday) / yesterday) * 100 + + if abs(pct_change) < 5: + return "stable" + elif pct_change > 0: + return f"up {pct_change:.0f}% vs yesterday" + else: + return f"down {abs(pct_change):.0f}% vs yesterday" + + except Exception as e: + logger.debug(f"Traffic trend error: {e}") + return "" + + def _get_daily_traffic_history(self, days: int = 7) -> dict: + """Query SQLite for daily packet counts. + + Args: + days: Number of days to look back + + Returns: + Dict like {"day_0": 1234, "day_1": 1100, ...} + """ + result = {} + + try: + conn = self.data_store._history_conn + if not conn: + return result + + cursor = conn.cursor() + + # Get packet counts per day from packet_log + for i in range(days): + start_ts = time.time() - ((i + 1) * 86400) + end_ts = time.time() - (i * 86400) + + cursor.execute(""" + SELECT COUNT(*) FROM packet_log + WHERE timestamp >= ? AND timestamp < ? + """, (start_ts, end_ts)) + + row = cursor.fetchone() + result[f"day_{i}"] = row[0] if row else 0 + + except Exception as e: + logger.debug(f"Daily traffic history error: {e}") + + return result + def _gather_top_issues(self, health) -> list[str]: """Gather top issues across all pillars.""" issues = [] @@ -148,43 +446,54 @@ class MeshReporter: for nid in region.node_ids: node = health.nodes.get(nid) if node and node.is_infrastructure and not node.is_online: - offline_infra.append(node.short_name or nid[:4]) + name = _node_display_name(node.long_name, node.short_name, nid) + offline_infra.append(name) if offline_infra: - total_infra = sum(1 for nid in region.node_ids - if health.nodes.get(nid) and health.nodes[nid].is_infrastructure) - online = total_infra - len(offline_infra) - issues.append(f"{region.name}: {online}/{total_infra} infrastructure nodes offline ({', '.join(offline_infra[:3])})") + total_infra = sum( + 1 + for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].is_infrastructure + ) + issues.append( + f"{region.name}: {len(offline_infra)}/{total_infra} infrastructure offline " + f"({', '.join(offline_infra[:3])})" + ) # Utilization issues for region in health.regions: if region.score.util_percent >= 25: - issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Warning)") + issues.append( + f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (High)" + ) elif region.score.util_percent >= 20: - issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Elevated)") + issues.append( + f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Elevated)" + ) # Behavior issues (high packet nodes) flagged = self.health_engine.get_flagged_nodes() for node in flagged[:3]: threshold = self.health_engine.packet_threshold ratio = node.non_text_packets / threshold - issues.append(f"Node {node.short_name or node.node_id[:4]} sending {node.non_text_packets} non-text packets/24h ({ratio:.1f}x threshold)") + name = _node_display_name(node.long_name, node.short_name, node.node_id) + issues.append( + f"Node {name} sending " + f"{node.non_text_packets} non-text packets/24h ({ratio:.1f}x threshold)" + ) - # Battery issues + # Battery issues (skip USB-powered nodes) battery_warnings = self.health_engine.get_battery_warnings() for node in battery_warnings[:2]: - issues.append(f"Node {node.short_name or node.node_id[:4]} battery at {node.battery_percent:.0f}%") + if node.battery_percent is not None and node.battery_percent <= 100: + name = _node_display_name(node.long_name, node.short_name, node.node_id) + issues.append( + f"Node {name} battery at {node.battery_percent:.0f}%" + ) return issues def build_region_detail(self, region_name: str) -> str: - """Build detailed breakdown for a specific region. - - Args: - region_name: Region to get detail for - - Returns: - Formatted region detail string - """ + """Build detailed breakdown for a specific region.""" health = self.health_engine.mesh_health if not health: return f"REGION DETAIL: {region_name}\nNo data available." @@ -202,55 +511,63 @@ class MeshReporter: f"Infrastructure ({rs.infra_online}/{rs.infra_total}):", ] - # Collect infrastructure nodes and detect duplicate shortnames + # Collect infrastructure nodes infra_nodes = [] for nid in region.node_ids: node = health.nodes.get(nid) if node and node.is_infrastructure: infra_nodes.append((nid, node)) - # Count shortname occurrences to detect duplicates - shortname_counts: dict[str, int] = {} - for nid, node in infra_nodes: - sn = node.short_name or nid[:4] - shortname_counts[sn] = shortname_counts.get(sn, 0) + 1 - - # List infrastructure nodes with disambiguation for duplicates + # List infrastructure nodes with battery, packets, and utilization for nid, node in infra_nodes: status = "+" if node.is_online else "X" age = _format_age(node.last_seen) - bat = f", bat {node.battery_percent:.0f}%" if node.battery_percent else "" role = node.role or "ROUTER" - sn = node.short_name or nid[:4] + hw = f", {node.hw_model}" if node.hw_model else "" - # Disambiguate duplicate shortnames with node ID suffix - if shortname_counts.get(sn, 0) > 1: - # Use last 4 chars of node_id as disambiguator - disambig = f", !{nid[-8:]}" if len(nid) >= 8 else f", {nid}" - name_str = f"{sn} ({role}{disambig})" - else: - name_str = f"{sn} ({role})" + # Use long name first format + display_name = _node_display_name(node.long_name, node.short_name, nid) + name_str = f"{display_name} ({role}{hw})" - lines.append(f" {status} {name_str} - last seen {age}{bat}") + # Build metrics string with formatted battery + metrics = [] + metrics.append(f"seen {age}") + if node.battery_percent is not None: + metrics.append(f"bat {_format_battery(node.battery_percent, node.voltage)}") + if node.packet_count_24h > 0: + metrics.append(f"{node.packet_count_24h} pkts/24h") + if node.channel_utilization is not None: + metrics.append(f"util {node.channel_utilization:.1f}%") + # Add neighbor count from unified node + unified_node = self.data_store.nodes.get(node.node_num) + if unified_node and unified_node.neighbor_count > 0: + metrics.append(f"{unified_node.neighbor_count} neighbors") + + line = f" {status} {name_str} - {', '.join(metrics)}" if not node.is_online: - lines[-1] += " <- OFFLINE" + line += " <- OFFLINE" + lines.append(line) # Channel utilization by locality lines.append("") - mesh_health = self.health_engine.mesh_health - if mesh_health and mesh_health.has_packet_data: + if health.has_packet_data or rs.util_data_available: lines.append(f"Channel Utilization: {rs.util_percent:.0f}%") if region.localities: lines.append(" Localities:") for loc in region.localities: node_count = len(loc.node_ids) - lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes") + lines.append( + f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes" + ) else: lines.append("Channel Utilization: No data available") # MQTT uplink stats for region - uplink_nodes = [health.nodes.get(nid) for nid in region.node_ids - if health.nodes.get(nid) and health.nodes[nid].uplink_enabled] + uplink_nodes = [ + health.nodes.get(nid) + for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].uplink_enabled + ] lines.append("") lines.append(f"MQTT Uplinks: {len(uplink_nodes)} nodes") @@ -263,34 +580,78 @@ class MeshReporter: if flagged_in_region: lines.append("") - lines.append("Flagged Nodes:") + lines.append("Flagged Nodes (high packet senders):") for node in flagged_in_region[:5]: - lines.append(f" {node.short_name or node.node_id[:4]}: {node.non_text_packets} non-text pkts/24h") + name = _node_display_name(node.long_name, node.short_name, node.node_id) + lines.append( + f" {name}: {node.non_text_packets} non-text pkts/24h" + ) - # Power warnings in this region + # Power warnings in this region (skip USB-powered) low_bat = [] for nid in region.node_ids: node = health.nodes.get(nid) - if node and node.battery_percent is not None and node.battery_percent < self.health_engine.battery_warning_percent: + if ( + node + and node.battery_percent is not None + and node.battery_percent <= 100 # Skip USB powered + and node.battery_percent < self.health_engine.battery_warning_percent + ): low_bat.append(node) if low_bat: lines.append("") - lines.append("Power:") - bat_str = ", ".join(f"{n.short_name or n.node_id[:4]} at {n.battery_percent:.0f}%" for n in low_bat[:4]) + lines.append("Power Warnings:") + bat_str = ", ".join( + f"{_node_display_name(n.long_name, n.short_name, n.node_id)} at {n.battery_percent:.0f}%" + for n in low_bat[:4] + ) lines.append(f" Low battery: {bat_str}") + # Regional environment summary + env_nodes = [ + self.data_store.nodes.get(nid) + for nid in region.node_ids + if self.data_store.nodes.get(nid) and self.data_store.nodes[nid].has_environment_sensor + ] + env_nodes = [n for n in env_nodes if n] # Filter None + + if env_nodes: + # Filter outlier temperatures for aggregation + temps = [n.temperature for n in env_nodes if _is_valid_temperature(n.temperature)] + humids = [n.humidity for n in env_nodes if n.humidity is not None] + pressures = [n.barometric_pressure for n in env_nodes if n.barometric_pressure is not None] + + lines.append("") + lines.append(f"Environment ({len(env_nodes)} sensors):") + if temps: + avg_t = sum(temps) / len(temps) + avg_f = avg_t * 9/5 + 32 + lines.append(f" Temp: {min(temps):.1f}-{max(temps):.1f}C (avg {avg_t:.1f}C / {avg_f:.1f}F)") + if humids: + lines.append(f" Humidity: {min(humids):.0f}-{max(humids):.0f}% (avg {sum(humids)/len(humids):.0f}%)") + if pressures: + lines.append(f" Pressure: {min(pressures):.1f}-{max(pressures):.1f} hPa") + + # Air quality summary + aq_nodes = [ + self.data_store.nodes.get(nid) + for nid in region.node_ids + if self.data_store.nodes.get(nid) and self.data_store.nodes[nid].has_air_quality_sensor + ] + aq_nodes = [n for n in aq_nodes if n] + + if aq_nodes: + pm25s = [n.pm2_5 for n in aq_nodes if n.pm2_5 is not None] + if pm25s: + avg_pm = sum(pm25s) / len(pm25s) + aqi_label = "Good" if avg_pm < 12 else "Moderate" if avg_pm < 35 else "Unhealthy" + lines.append(f" Air Quality: PM2.5 avg {avg_pm:.1f} ug/m3 ({aqi_label})") + return "\n".join(lines) def build_node_detail(self, node_identifier: str) -> str: - """Build detailed info for a specific node. - - Args: - node_identifier: Shortname, longname, nodeId, or nodeNum - - Returns: - Formatted node detail string - """ + """Build detailed info for a specific node with historical data.""" health = self.health_engine.mesh_health if not health: return f"NODE DETAIL: {node_identifier}\nNo data available." @@ -300,11 +661,16 @@ class MeshReporter: if not node: return f"NODE DETAIL: {node_identifier}\nNode not found." + # Get corresponding unified node from data store for historical data + unified = self.data_store.get_node(node.node_num) + + # Header with long name first + display_name = _node_display_name(node.long_name, node.short_name, str(node.node_num)) lines = [ - f"NODE DETAIL: {node.long_name or node.short_name} ({node.short_name})", - f"ID: {node.node_id}", - f"Hardware: {node.role or 'Unknown'}", - f"Role: {'Infrastructure' if node.is_infrastructure else 'Client'}", + f"NODE DETAIL: {display_name}", + f"ID: !{node.node_num:08x} (dec: {node.node_num})", + f"Hardware: {node.hw_model or 'Unknown'}", + f"Role: {node.role} ({'Infrastructure' if node.is_infrastructure else 'Client'})", f"Region: {node.region or 'Unknown'} / Locality: {node.locality or 'Unknown'}", ] @@ -315,25 +681,28 @@ class MeshReporter: status = "Online" if node.is_online else "OFFLINE" lines.append(f"Last Seen: {age} ({status})") - # Get source info from source manager - all_nodes = self.source_manager.get_all_nodes() - sources = [] - for n in all_nodes: - nid = str(n.get("id") or n.get("nodeId") or n.get("num") or "") - if nid == node.node_id: - sources = n.get("_sources", []) - break - if sources: - lines.append(f"Sources: {', '.join(sources)}") + # Sources from unified node + if unified and unified.sources: + lines.append(f"Sources: {', '.join(unified.sources)}") - # Traffic stats + # Traffic stats with historical data lines.append("") - lines.append("Traffic (24h):") - lines.append(f" Total packets: {node.packet_count_24h}") - lines.append(f" Text messages: {node.text_packet_count_24h}") - lines.append(f" Position: {node.position_packet_count_24h}") - lines.append(f" Telemetry: {node.telemetry_packet_count_24h}") - lines.append(f" Other non-text: {node.non_text_packets - node.position_packet_count_24h - node.telemetry_packet_count_24h}") + lines.append("Traffic History:") + lines.append(f" 24h: {node.packet_count_24h} pkts") + if unified: + lines.append(f" 48h: {unified.packets_sent_48h}") + lines.append(f" 7d: {unified.packets_sent_7d}") + lines.append(f" 14d: {unified.packets_sent_14d}") + + # Packet breakdown with clean portnum names + if node.packets_by_portnum: + lines.append("") + lines.append("Packet Breakdown (24h):") + for portnum, count in sorted( + node.packets_by_portnum.items(), key=lambda x: -x[1] + )[:5]: + clean_name = _clean_portnum(portnum) + lines.append(f" {clean_name}: {count}") # Estimated intervals est_pos = node.estimated_position_interval @@ -344,22 +713,34 @@ class MeshReporter: interval_str = f"{int(est_pos / 60)}m" lines.append(f" Est. position interval: {interval_str}") - # Channel utilization from device telemetry - if node.channel_utilization is not None: - lines.append(f" Channel util (device): {node.channel_utilization:.1f}%") - if node.air_util_tx is not None: - lines.append(f" TX airtime: {node.air_util_tx:.1f}%") - - # Power + # RF Metrics section - distinguish channel util from TX airtime lines.append("") - lines.append("Power:") - if node.battery_percent is not None: - bat_status = "Low" if node.battery_percent < 20 else "OK" - lines.append(f" Battery: {node.battery_percent:.0f}% ({bat_status})") + lines.append("RF Metrics:") + if node.channel_utilization is not None: + lines.append(f" Channel Utilization: {node.channel_utilization:.1f}% (RF busyness this node hears)") else: - lines.append(" Battery: N/A") - if node.voltage: - lines.append(f" Voltage: {node.voltage:.2f}V") + lines.append(" Channel Utilization: N/A") + if node.air_util_tx is not None: + lines.append(f" TX Airtime: {node.air_util_tx:.1f}% (this node's transmissions)") + else: + lines.append(" TX Airtime: N/A") + + # Power with battery trend and formatted display + lines.append("") + lines.append("Battery:") + if node.battery_percent is not None: + bat_display = _format_battery(node.battery_percent, node.voltage) + lines.append(f" Current: {bat_display}") + else: + lines.append(" Current: N/A") + + if node.battery_trend: + lines.append(f" Trend: {node.battery_trend}") + if node.predicted_depletion_hours: + lines.append( + f" Predicted depletion: {node.predicted_depletion_hours:.0f} hours" + ) + lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}") # Connectivity @@ -367,8 +748,96 @@ class MeshReporter: lines.append("Connectivity:") lines.append(f" MQTT Uplink: {'Enabled' if node.uplink_enabled else 'Disabled'}") - # Recommendations for this node - recs = self._node_recommendations(node) + # Neighbors section + if unified and unified.neighbors: + lines.append("") + lines.append(f"Neighbors ({unified.neighbor_count}):") + + # Build edge lookup for signal quality + edge_lookup = {} + for e in self.data_store.edges: + edge_lookup[(e.from_node, e.to_node)] = e + edge_lookup[(e.to_node, e.from_node)] = e + + # Build neighbor list with SNR for sorting + neighbor_data = [] + for neighbor_num in unified.neighbors: + neighbor = self.data_store.get_node(neighbor_num) + edge = edge_lookup.get((node.node_num, neighbor_num)) + snr = edge.snr if edge else None + rssi = edge.rssi if edge else None + neighbor_data.append((neighbor_num, neighbor, snr, rssi)) + + # Sort by best SNR first (None values last) + neighbor_data.sort(key=lambda x: (x[2] is None, -(x[2] or -999))) + + # Show first 10 + for neighbor_num, neighbor, snr, rssi in neighbor_data[:10]: + if neighbor: + name = _node_display_name(neighbor.long_name, neighbor.short_name, str(neighbor_num)) + else: + name = f"!{neighbor_num:08x} (unknown)" + + # Build signal info - only SNR and RSSI + parts = [] + if snr is not None: + parts.append(f"SNR {snr:.1f}") + if rssi is not None: + parts.append(f"RSSI {rssi}") + + if parts: + lines.append(f" {name} [{', '.join(parts)}]") + else: + lines.append(f" {name}") + + if len(neighbor_data) > 10: + lines.append(f" ...and {len(neighbor_data) - 10} more") + + # Environment section (from unified node sensor data) + if unified: + env_lines = [] + if unified.temperature is not None: + temp_str = _format_temperature(unified.temperature) + env_lines.append(f"Temp: {temp_str}") + if unified.humidity is not None: + env_lines.append(f"Humidity: {unified.humidity:.1f}%") + if unified.barometric_pressure is not None: + env_lines.append(f"Pressure: {unified.barometric_pressure:.1f} hPa") + if unified.gas_resistance is not None: + env_lines.append(f"Gas Resistance: {unified.gas_resistance:.0f} Ohm") + if unified.iaq is not None: + iaq_label = "Good" if unified.iaq < 50 else "Moderate" if unified.iaq < 100 else "Poor" + env_lines.append(f"IAQ: {unified.iaq:.0f} ({iaq_label})") + if unified.light_lux is not None: + env_lines.append(f"Light: {unified.light_lux:.0f} lux") + if unified.wind_speed is not None: + env_lines.append(f"Wind: {unified.wind_speed:.1f} m/s") + if unified.wind_direction is not None: + env_lines.append(f"Wind Dir: {unified.wind_direction:.0f} deg") + if unified.rainfall is not None: + env_lines.append(f"Rainfall: {unified.rainfall:.1f} mm") + if unified.pm2_5 is not None: + aqi_label = "Good" if unified.pm2_5 < 12 else "Moderate" if unified.pm2_5 < 35 else "Unhealthy" + env_lines.append(f"PM2.5: {unified.pm2_5:.1f} ug/m3 ({aqi_label})") + if unified.pm10 is not None: + env_lines.append(f"PM10: {unified.pm10:.1f} ug/m3") + if unified.ext_voltage is not None: + env_lines.append(f"Ext Voltage: {unified.ext_voltage:.2f}V") + if unified.ext_current is not None: + env_lines.append(f"Ext Current: {unified.ext_current:.1f}mA") + if unified.uv_index is not None: + env_lines.append(f"UV Index: {unified.uv_index:.1f}") + if unified.radiation_cpm is not None: + env_lines.append(f"Radiation: {unified.radiation_cpm:.0f} CPM") + + if env_lines: + lines.append("") + lines.append("Environment:") + for el in env_lines: + lines.append(f" {el}") + + # Recommendations for this node (trend-aware) + recs = self._node_recommendations(node, unified) if recs: lines.append("") lines.append("Recommendations:") @@ -377,31 +846,65 @@ class MeshReporter: return "\n".join(lines) - def _node_recommendations(self, node) -> list[str]: - """Generate recommendations for a specific node.""" + def _node_recommendations(self, node: "NodeHealth", unified=None) -> list[str]: + """Generate recommendations for a specific node. + + Args: + node: NodeHealth instance + unified: Optional UnifiedNode for historical data + """ recs = [] - # High packet count + # High packet count with trend context if node.non_text_packets > self.health_engine.packet_threshold: ratio = node.non_text_packets / self.health_engine.packet_threshold - recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.") + + # Check if trending up + trend_note = "" + if unified: + avg_7d = unified.packets_sent_7d / 7 if unified.packets_sent_7d else 0 + if avg_7d > 0 and node.packet_count_24h > avg_7d * 1.5: + trend_note = " (trending up vs 7d avg)" + + recs.append( + f"Sending {ratio:.1f}x normal packets{trend_note}. Check position/telemetry intervals." + ) # Position interval too frequent (< 300s = 5 min) est_interval = node.estimated_position_interval if est_interval is not None and est_interval < 300: - recs.append(f"Position interval ~{int(est_interval)}s is aggressive. Recommend 900s (15 min) for battery life.") + recs.append( + f"Position interval ~{int(est_interval)}s is aggressive. " + f"Recommend 900s (15 min) for battery life." + ) - # High channel utilization on this node + # High channel utilization on this node (RF busyness it hears) if node.channel_utilization is not None and node.channel_utilization > 25: - recs.append(f"Channel utilization {node.channel_utilization:.0f}% - consider moving to less congested frequency.") + recs.append( + f"Channel utilization {node.channel_utilization:.0f}% (RF busyness) - " + f"this node's RF environment is congested." + ) # High air_util_tx (this node transmitting a lot) if node.air_util_tx is not None and node.air_util_tx > 10: - recs.append(f"TX airtime {node.air_util_tx:.1f}% - reduce telemetry frequency to be a better mesh citizen.") + recs.append( + f"TX airtime {node.air_util_tx:.1f}% - " + f"reduce telemetry frequency to be a better mesh citizen." + ) - # Low battery - if node.battery_percent is not None and node.battery_percent < 20: - recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.") + # Low battery (skip USB-powered) + if node.battery_percent is not None and node.battery_percent <= 100 and node.battery_percent < 20: + recs.append( + f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar." + ) + + # Declining battery trend + if node.battery_trend == "declining": + hrs = node.predicted_depletion_hours + if hrs and hrs < 48: + recs.append( + f"Battery declining - estimated depletion in {hrs:.0f} hours." + ) # Offline if not node.is_online: @@ -410,20 +913,42 @@ class MeshReporter: # Infrastructure node without MQTT uplink if node.is_infrastructure and not node.uplink_enabled: - recs.append("Infrastructure node without MQTT uplink. Consider enabling for better mesh visibility.") + recs.append( + "Infrastructure node without MQTT uplink. " + "Consider enabling for better mesh visibility." + ) + + # Environmental recommendations (from unified node) + if unified: + # Freezing temperature warning for battery nodes + if unified.temperature is not None and unified.temperature < 0: + if unified.battery_percent is not None and unified.battery_percent <= 100: + recs.append( + f"Temperature {unified.temperature:.1f}C - below freezing reduces battery capacity 20-40%." + ) + + # High humidity condensation risk + if unified.humidity is not None and unified.humidity > 90: + recs.append( + f"Humidity at {unified.humidity:.0f}% - condensation risk. Ensure enclosure is sealed." + ) + + # Poor air quality + if unified.pm2_5 is not None and unified.pm2_5 > 35: + recs.append( + f"PM2.5 at {unified.pm2_5:.1f} ug/m3 - unhealthy air quality in this area." + ) + + # High wind + if unified.wind_speed is not None and unified.wind_speed > 20: + recs.append( + f"Wind speed {unified.wind_speed:.1f} m/s - check antenna mounting and cable strain relief." + ) return recs def build_recommendations(self, scope: str, scope_value: str = None) -> str: - """Generate actionable optimization recommendations. - - Args: - scope: "mesh", "region", or "node" - scope_value: Region name or node identifier (for scoped recommendations) - - Returns: - Formatted recommendations string - """ + """Generate actionable optimization recommendations.""" health = self.health_engine.mesh_health if not health: return "" @@ -432,8 +957,9 @@ class MeshReporter: if scope == "node" and scope_value: node = self._find_node(scope_value) + unified = self.data_store.get_node(node.node_num) if node else None if node: - recs.extend(self._node_recommendations(node)) + recs.extend(self._node_recommendations(node, unified)) elif scope == "region" and scope_value: region = self._find_region(scope_value) @@ -452,28 +978,40 @@ class MeshReporter: return "\n".join(lines) - def _region_recommendations(self, region, health) -> list[str]: + def _region_recommendations( + self, region: "RegionHealth", health + ) -> list[str]: """Generate recommendations for a region.""" recs = [] - # High utilization + # High utilization with trend context if region.score.util_percent >= 20: - recs.append(f"Channel utilization at {region.score.util_percent:.0f}%. Consider spreading nodes across frequencies or reducing telemetry intervals.") + recs.append( + f"Channel utilization at {region.score.util_percent:.0f}%. " + f"Consider spreading nodes across frequencies or reducing telemetry intervals." + ) # Offline infrastructure offline_count = region.score.infra_total - region.score.infra_online if offline_count > 0: - recs.append(f"{offline_count} infrastructure node(s) offline. Check power and connectivity.") + recs.append( + f"{offline_count} infrastructure node(s) offline. Check power and connectivity." + ) - # Flagged nodes + # Flagged nodes (high packet senders) flagged = [] for nid in region.node_ids: node = health.nodes.get(nid) if node and node.non_text_packets > self.health_engine.packet_threshold: flagged.append(node) if flagged: - names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3]) - recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.") + names = ", ".join( + _node_display_name(n.long_name, n.short_name, n.node_id) + for n in flagged[:3] + ) + recs.append( + f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings." + ) # Check for nodes with aggressive position intervals aggressive_interval_nodes = [] @@ -484,80 +1022,123 @@ class MeshReporter: if est is not None and est < 300: aggressive_interval_nodes.append(node) if aggressive_interval_nodes: - names = ", ".join(n.short_name or n.node_id[:4] for n in aggressive_interval_nodes[:3]) - recs.append(f"Nodes with frequent position broadcasts ({names}). Recommend 900s interval.") + names = ", ".join( + _node_display_name(n.long_name, n.short_name, n.node_id) + for n in aggressive_interval_nodes[:3] + ) + recs.append( + f"Nodes with frequent position broadcasts ({names}). Recommend 900s interval." + ) # Check MQTT/uplink coverage in region - infra_nodes = [health.nodes.get(nid) for nid in region.node_ids - if health.nodes.get(nid) and health.nodes[nid].is_infrastructure] + infra_nodes = [ + health.nodes.get(nid) + for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].is_infrastructure + ] uplink_count = sum(1 for n in infra_nodes if n and n.uplink_enabled) if infra_nodes and uplink_count == 0: - recs.append("No MQTT uplinks in region. Consider enabling on at least one infrastructure node.") + recs.append( + "No MQTT uplinks in region. Consider enabling on at least one infrastructure node." + ) elif len(infra_nodes) >= 3 and uplink_count == 1: - recs.append(f"Only 1/{len(infra_nodes)} infrastructure nodes with MQTT uplink. Consider adding redundancy.") + recs.append( + f"Only 1/{len(infra_nodes)} infrastructure nodes with MQTT uplink. " + f"Consider adding redundancy." + ) return recs def _mesh_recommendations(self, health) -> list[str]: - """Generate mesh-wide recommendations.""" + """Generate mesh-wide recommendations with trend awareness.""" recs = [] # Overall utilization if health.score.util_percent >= 20: - recs.append(f"Mesh-wide utilization at {health.score.util_percent:.0f}%. Consider reducing position/telemetry broadcast frequency.") + recs.append( + f"Mesh-wide utilization at {health.score.util_percent:.0f}%. " + f"Consider reducing position/telemetry broadcast frequency." + ) + + # Traffic trend recommendation + trend = self._get_traffic_trend_summary() + if "up" in trend and "15" in trend: # Significant increase + recs.append( + f"Traffic {trend}. Review recently added nodes or changed settings." + ) # Multiple regions with issues problem_regions = [r for r in health.regions if r.score.composite < 75] if len(problem_regions) > 1: names = ", ".join(r.name for r in problem_regions[:3]) - recs.append(f"Multiple regions degraded ({names}). Prioritize infrastructure improvements.") + recs.append( + f"Multiple regions degraded ({names}). Prioritize infrastructure improvements." + ) # High packet nodes mesh-wide flagged = self.health_engine.get_flagged_nodes() if len(flagged) > 3: - total_excess = sum(n.non_text_packets - self.health_engine.packet_threshold for n in flagged) - recs.append(f"{len(flagged)} nodes exceeding packet threshold ({total_excess} excess packets/day). Review default telemetry intervals.") + total_excess = sum( + n.non_text_packets - self.health_engine.packet_threshold for n in flagged + ) + recs.append( + f"{len(flagged)} nodes exceeding packet threshold ({total_excess} excess packets/day). " + f"Review default telemetry intervals." + ) - # Battery warnings - battery_warnings = self.health_engine.get_battery_warnings() + # Battery warnings (exclude USB-powered) + battery_warnings = [ + n for n in self.health_engine.get_battery_warnings() + if n.battery_percent is not None and n.battery_percent <= 100 + ] if len(battery_warnings) > 2: - recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.") + recs.append( + f"{len(battery_warnings)} nodes with low battery. " + f"Consider solar additions for remote nodes." + ) # Hop count recommendation from traceroutes if health.has_traceroute_data: if health.avg_hop_count > 4: - recs.append(f"Average hop count {health.avg_hop_count:.1f} is high. Consider adding infrastructure to reduce latency.") + recs.append( + f"Average hop count {health.avg_hop_count:.1f} is high. " + f"Consider adding infrastructure to reduce latency." + ) elif health.max_hop_count > 6: - recs.append(f"Max hop count {health.max_hop_count} indicates long routes. Strategic node placement could improve reach.") + recs.append( + f"Max hop count {health.max_hop_count} indicates long routes. " + f"Strategic node placement could improve reach." + ) # MQTT uplink coverage if health.uplink_node_count == 0: total_infra = sum(1 for n in health.nodes.values() if n.is_infrastructure) if total_infra > 0: - recs.append("No MQTT uplinks detected. Enable on infrastructure nodes for better mesh visibility.") + recs.append( + "No MQTT uplinks detected. Enable on infrastructure nodes for better mesh visibility." + ) elif health.total_regions > 0: uplinks_per_region = health.uplink_node_count / health.total_regions if uplinks_per_region < 1: - recs.append(f"Only {health.uplink_node_count} MQTT uplinks across {health.total_regions} regions. Consider adding redundancy.") + recs.append( + f"Only {health.uplink_node_count} MQTT uplinks across " + f"{health.total_regions} regions. Consider adding redundancy." + ) - # Aggressive position intervals mesh-wide - aggressive_nodes = [n for n in health.nodes.values() - if n.estimated_position_interval is not None and n.estimated_position_interval < 300] - if len(aggressive_nodes) > 5: - recs.append(f"{len(aggressive_nodes)} nodes with position interval <5min. Recommend 15min (900s) as default.") + # Mesh-wide deliverability/coverage + deliver = self.data_store.get_mesh_deliverability() + if deliver.get("avg_gateways") is not None: + avg_gw = deliver["avg_gateways"] + if avg_gw < 1.5: + recs.append( + f"Mesh-wide average is {avg_gw:.1f} gateways per packet. " + f"Adding MQTT feeders would improve monitoring reliability." + ) return recs def build_lora_compact(self, scope: str, scope_value: str = None) -> str: - """Build LoRa-optimized compact summary (~200 chars). - - Args: - scope: "mesh" or "region" - scope_value: Region name if scope is "region" - - Returns: - Compact formatted string - """ + """Build LoRa-optimized compact summary (~200 chars).""" health = self.health_engine.mesh_health if not health: return "Mesh: No data" @@ -567,29 +1148,103 @@ class MeshReporter: if not region: return f"Region '{scope_value}' not found" rs = region.score - return f"{region.name} {rs.composite:.0f}/100 | {rs.infra_online}/{rs.infra_total} infra | {rs.util_percent:.0f}% util" + return ( + f"{region.name} {rs.composite:.0f}/100 | " + f"{rs.infra_online}/{rs.infra_total} infra | {rs.util_percent:.0f}% util" + ) # Mesh summary s = health.score - lines = [f"Mesh {s.composite:.0f}/100 | {s.infra_online}/{s.infra_total} infra | {s.util_percent:.0f}% util"] + lines = [ + f"Mesh {s.composite:.0f}/100 | {s.infra_online}/{s.infra_total} infra | {s.util_percent:.0f}% util" + ] # Add warnings for problem regions/nodes warnings = [] for region in health.regions: if region.score.composite < 60: offline = region.score.infra_total - region.score.infra_online - warnings.append(f"! {region.name} {region.score.composite:.0f}/100 - {offline} infra offline") + warnings.append( + f"! {region.name} {region.score.composite:.0f}/100 - {offline} infra offline" + ) - battery_warnings = self.health_engine.get_battery_warnings() + # Battery warnings (skip USB-powered) + battery_warnings = [ + n for n in self.health_engine.get_battery_warnings() + if n.battery_percent is not None and n.battery_percent <= 100 + ] for node in battery_warnings[:2]: - warnings.append(f"! {node.short_name or node.node_id[:4]} bat {node.battery_percent:.0f}%") + name = node.short_name or node.node_id[:4] + warnings.append( + f"! {name} bat {node.battery_percent:.0f}%" + ) for w in warnings[:2]: lines.append(w) return "\n".join(lines) - def _find_region(self, name: str): + def build_node_compact(self, node_identifier: str) -> str: + """Build compact node status for subscription DMs (~200 chars).""" + health = self.health_engine.mesh_health + if not health: + return "Node: No data" + + node = self._find_node(node_identifier) + if not node: + return f"Node '{node_identifier}' not found" + + unified = self.data_store.get_node(node.node_num) + + # Build compact status + display_name = node.short_name or node.long_name or f"!{node.node_num:08x}" + status = "ON" if node.is_online else "OFF" + age = _format_age(node.last_seen) + + parts = [f"{display_name} [{status}]"] + + # Battery (skip USB) + if node.battery_percent is not None: + if node.battery_percent > 100: + parts.append("USB") + else: + parts.append(f"bat {node.battery_percent:.0f}%") + + # Last seen + parts.append(f"seen {age}") + + # Traffic + if node.packet_count_24h > 0: + parts.append(f"{node.packet_count_24h} pkts/24h") + + # Channel util + if node.channel_utilization is not None: + parts.append(f"util {node.channel_utilization:.0f}%") + + # Neighbors + if unified and unified.neighbor_count > 0: + parts.append(f"{unified.neighbor_count} nbrs") + + line1 = " | ".join(parts) + + # Warnings if any + warnings = [] + if not node.is_online: + warnings.append("! OFFLINE") + elif node.battery_percent is not None and node.battery_percent <= 20 and node.battery_percent <= 100: + warnings.append("! LOW BAT") + if node.non_text_packets > self.health_engine.packet_threshold: + warnings.append("! HIGH TRAFFIC") + + if warnings: + return f"{line1}\n{' '.join(warnings)}" + return line1 + + def build_region_compact(self, region_name: str) -> str: + """Build compact region status for subscription DMs (~200 chars).""" + return self.build_lora_compact(scope="region", scope_value=region_name) + + def _find_region(self, name: str) -> Optional["RegionHealth"]: """Find a region by fuzzy name match.""" health = self.health_engine.mesh_health if not health: @@ -609,17 +1264,15 @@ class MeshReporter: # Try matching against anchor city names for anchor in self.health_engine.regions: - # Check if search term matches anchor city or region name anchor_name_lower = anchor.name.lower() if name_lower in anchor_name_lower: - # Find the corresponding region for region in health.regions: if region.name == anchor.name: return region return None - def _find_node(self, identifier: str): + def _find_node(self, identifier: str) -> Optional["NodeHealth"]: """Find a node by shortname, longname, nodeId, or nodeNum.""" health = self.health_engine.mesh_health if not health: @@ -651,14 +1304,10 @@ class MeshReporter: # Try decimal nodeNum if identifier.isdigit(): - # Convert to hex and search - try: - hex_id = format(int(identifier), 'x') - for nid, node in health.nodes.items(): - if hex_id in nid.lower(): - return node - except ValueError: - pass + node_num = int(identifier) + node_id = str(node_num) + if node_id in health.nodes: + return health.nodes[node_id] return None diff --git a/meshai/subscriptions.py b/meshai/subscriptions.py new file mode 100644 index 0000000..e0bbf7f --- /dev/null +++ b/meshai/subscriptions.py @@ -0,0 +1,278 @@ +"""Subscription management for scheduled reports and alerts.""" + +import logging +import sqlite3 +import time +from typing import Optional + +logger = logging.getLogger(__name__) + +# Valid subscription types +VALID_SUB_TYPES = {"daily", "weekly", "alerts"} +VALID_DAYS = {"mon", "tue", "wed", "thu", "fri", "sat", "sun"} +VALID_SCOPE_TYPES = {"mesh", "region", "node"} + + +class SubscriptionManager: + """Manages user subscriptions with SQLite storage.""" + + def __init__(self, db_path: str): + """Initialize subscription manager. + + Args: + db_path: Path to SQLite database (same as mesh_history.db) + """ + self._db_path = db_path + self._db: Optional[sqlite3.Connection] = None + self._init_db() + + def _init_db(self): + """Initialize database connection and schema.""" + self._db = sqlite3.connect(self._db_path, check_same_thread=False) + self._db.row_factory = sqlite3.Row + + self._db.executescript(""" + CREATE TABLE IF NOT EXISTS subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + sub_type TEXT NOT NULL, + schedule_time TEXT, + schedule_day TEXT, + scope_type TEXT DEFAULT 'mesh', + scope_value TEXT, + created_at REAL NOT NULL, + last_sent REAL DEFAULT 0, + enabled INTEGER DEFAULT 1 + ); + CREATE INDEX IF NOT EXISTS idx_sub_user ON subscriptions(user_id); + CREATE INDEX IF NOT EXISTS idx_sub_type ON subscriptions(sub_type); + """) + self._db.commit() + logger.info("Subscription manager initialized") + + def _row_to_dict(self, row: sqlite3.Row) -> dict: + """Convert sqlite Row to dict.""" + return dict(row) + + def add(self, user_id: str, sub_type: str, schedule_time: str = None, + schedule_day: str = None, scope_type: str = "mesh", + scope_value: str = None) -> dict: + """Add a subscription. + + Args: + user_id: Subscriber node_num + sub_type: "daily", "weekly", or "alerts" + schedule_time: HHMM format (required for daily/weekly) + schedule_day: mon-sun (required for weekly) + scope_type: "mesh", "region", or "node" + scope_value: Region name or node identifier + + Returns: + Created subscription dict + + Raises: + ValueError: If validation fails + """ + # Validate sub_type + if sub_type not in VALID_SUB_TYPES: + raise ValueError(f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts") + + # Validate schedule_time for daily/weekly + if sub_type in ("daily", "weekly"): + if not schedule_time: + raise ValueError(f"Time required for {sub_type} subscription. Use HHMM format (e.g., 1830)") + if not self._validate_time(schedule_time): + raise ValueError("Invalid time format. Use HHMM (e.g., 1830 for 6:30 PM)") + + # Validate schedule_day for weekly + if sub_type == "weekly": + if not schedule_day: + raise ValueError("Day required for weekly subscription. Use: mon, tue, wed, thu, fri, sat, sun") + if schedule_day.lower() not in VALID_DAYS: + raise ValueError("Invalid day. Use: mon, tue, wed, thu, fri, sat, sun") + schedule_day = schedule_day.lower() + + # Validate scope_type + if scope_type not in VALID_SCOPE_TYPES: + raise ValueError(f"Invalid scope '{scope_type}'. Use: mesh, region, or node") + + # Check for duplicates + existing = self._db.execute(""" + SELECT id FROM subscriptions + WHERE user_id = ? AND sub_type = ? AND scope_type = ? + AND (scope_value = ? OR (scope_value IS NULL AND ? IS NULL)) + AND enabled = 1 + """, (user_id, sub_type, scope_type, scope_value, scope_value)).fetchone() + + if existing: + scope_desc = f" for {scope_type} {scope_value}" if scope_value else "" + raise ValueError(f"Already subscribed to {sub_type}{scope_desc}") + + # Insert subscription + now = time.time() + cursor = self._db.execute(""" + INSERT INTO subscriptions (user_id, sub_type, schedule_time, schedule_day, + scope_type, scope_value, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (user_id, sub_type, schedule_time, schedule_day, scope_type, scope_value, now)) + self._db.commit() + + sub_id = cursor.lastrowid + return self._get_by_id(sub_id) + + def _validate_time(self, time_str: str) -> bool: + """Validate HHMM time format.""" + if not time_str or len(time_str) != 4 or not time_str.isdigit(): + return False + hours = int(time_str[:2]) + minutes = int(time_str[2:]) + return 0 <= hours <= 23 and 0 <= minutes <= 59 + + def _get_by_id(self, sub_id: int) -> dict: + """Get subscription by ID.""" + row = self._db.execute( + "SELECT * FROM subscriptions WHERE id = ?", (sub_id,) + ).fetchone() + return self._row_to_dict(row) if row else None + + def remove(self, user_id: str, sub_type: str = None) -> int: + """Remove subscription(s). + + Args: + user_id: Subscriber node_num + sub_type: "daily", "weekly", "alerts", or None for all + + Returns: + Number of subscriptions removed + """ + if sub_type and sub_type != "all": + cursor = self._db.execute( + "DELETE FROM subscriptions WHERE user_id = ? AND sub_type = ?", + (user_id, sub_type) + ) + else: + cursor = self._db.execute( + "DELETE FROM subscriptions WHERE user_id = ?", + (user_id,) + ) + self._db.commit() + return cursor.rowcount + + def get_user_subs(self, user_id: str) -> list[dict]: + """Get all subscriptions for a user.""" + rows = self._db.execute( + "SELECT * FROM subscriptions WHERE user_id = ? AND enabled = 1 ORDER BY created_at", + (user_id,) + ).fetchall() + return [self._row_to_dict(r) for r in rows] + + def get_due_subscriptions(self, current_time_hhmm: str, current_day: str) -> list[dict]: + """Get subscriptions that should fire right now. + + Args: + current_time_hhmm: Current time as "HHMM" (e.g., "1830") + current_day: Current day as 3-letter lowercase (e.g., "sun") + + Returns: + List of subscription dicts that are due + """ + now = time.time() + due = [] + + # Get all daily/weekly subscriptions + rows = self._db.execute(""" + SELECT * FROM subscriptions + WHERE sub_type IN ('daily', 'weekly') AND enabled = 1 + """).fetchall() + + current_minutes = int(current_time_hhmm[:2]) * 60 + int(current_time_hhmm[2:]) + + for row in rows: + sub = self._row_to_dict(row) + schedule_time = sub.get("schedule_time") + if not schedule_time: + continue + + schedule_minutes = int(schedule_time[:2]) * 60 + int(schedule_time[2:]) + + # 5-minute matching window + if abs(schedule_minutes - current_minutes) > 5: + continue + + sub_type = sub["sub_type"] + last_sent = sub.get("last_sent", 0) or 0 + + if sub_type == "daily": + # Don't fire if sent within last 23 hours + if now - last_sent < 23 * 3600: + continue + due.append(sub) + + elif sub_type == "weekly": + # Check day matches + schedule_day = sub.get("schedule_day", "").lower() + if schedule_day != current_day.lower(): + continue + # Don't fire if sent within last 6 days + if now - last_sent < 6 * 24 * 3600: + continue + due.append(sub) + + return due + + def get_alert_subscribers(self, scope_type: str = None, scope_value: str = None) -> list[dict]: + """Get users subscribed to alerts matching a scope. + + Args: + scope_type: "mesh", "region", or "node" + scope_value: Region name or node identifier + + Returns: + List of subscription dicts where scope matches + """ + # Get all alert subscriptions + rows = self._db.execute(""" + SELECT * FROM subscriptions + WHERE sub_type = 'alerts' AND enabled = 1 + """).fetchall() + + matching = [] + for row in rows: + sub = self._row_to_dict(row) + sub_scope = sub.get("scope_type", "mesh") + sub_value = sub.get("scope_value") + + # Mesh scope gets ALL alerts + if sub_scope == "mesh": + matching.append(sub) + # Region scope gets alerts for that region + elif sub_scope == "region" and scope_type == "region": + if sub_value and scope_value and sub_value.lower() == scope_value.lower(): + matching.append(sub) + # Node scope gets alerts for that node + elif sub_scope == "node" and scope_type == "node": + if sub_value and scope_value and sub_value.lower() == scope_value.lower(): + matching.append(sub) + + return matching + + def mark_sent(self, subscription_id: int): + """Update last_sent timestamp to now.""" + self._db.execute( + "UPDATE subscriptions SET last_sent = ? WHERE id = ?", + (time.time(), subscription_id) + ) + self._db.commit() + + def get_all_subs(self) -> list[dict]: + """Get all subscriptions (for admin view).""" + rows = self._db.execute( + "SELECT * FROM subscriptions WHERE enabled = 1 ORDER BY user_id, created_at" + ).fetchall() + return [self._row_to_dict(r) for r in rows] + + def close(self): + """Close database connection.""" + if self._db: + self._db.close() + self._db = None