Merge subscriptions from main into feature branch (with full data pipeline)

This commit is contained in:
K7ZVX 2026-05-05 02:57:41 +00:00
commit af2f66d71d
5 changed files with 1619 additions and 224 deletions

View file

@ -158,6 +158,9 @@ def create_dispatcher(
disabled_commands: Optional[list[str]] = None, disabled_commands: Optional[list[str]] = None,
custom_commands: Optional[dict] = None, custom_commands: Optional[dict] = None,
mesh_reporter=None, mesh_reporter=None,
data_store=None,
health_engine=None,
subscription_manager=None,
) -> CommandDispatcher: ) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands. """Create and populate command dispatcher with default commands.
@ -166,6 +169,9 @@ def create_dispatcher(
disabled_commands: List of command names to disable disabled_commands: List of command names to disable
custom_commands: Dict of name -> response for custom commands custom_commands: Dict of name -> response for custom commands
mesh_reporter: MeshReporter instance for health commands mesh_reporter: MeshReporter instance for health commands
data_store: MeshDataStore for neighbor data
health_engine: MeshHealthEngine for infrastructure detection
subscription_manager: SubscriptionManager for subscription commands
Returns: Returns:
Configured CommandDispatcher Configured CommandDispatcher
@ -176,7 +182,8 @@ def create_dispatcher(
from .reset import ResetCommand from .reset import ResetCommand
from .status import StatusCommand from .status import StatusCommand
from .weather import WeatherCommand from .weather import WeatherCommand
from .health import HealthCommand, RegionCommand from .health import HealthCommand, RegionCommand, NeighborCommand
from .subscribe import SubCommand, UnsubCommand, MySubsCommand
dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands) dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands)
@ -205,6 +212,37 @@ def create_dispatcher(
alias_handler.name = alias alias_handler.name = alias
dispatcher.register(alias_handler) dispatcher.register(alias_handler)
# Register neighbors command
neighbor_cmd = NeighborCommand(mesh_reporter, data_store, health_engine)
dispatcher.register(neighbor_cmd)
# Register aliases for neighbors command
for alias in getattr(neighbor_cmd, 'aliases', []):
alias_handler = NeighborCommand(mesh_reporter, data_store, health_engine)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register subscription commands
sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store)
dispatcher.register(sub_cmd)
for alias in getattr(sub_cmd, 'aliases', []):
alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store)
alias_handler.name = alias
dispatcher.register(alias_handler)
unsub_cmd = UnsubCommand(subscription_manager)
dispatcher.register(unsub_cmd)
for alias in getattr(unsub_cmd, 'aliases', []):
alias_handler = UnsubCommand(subscription_manager)
alias_handler.name = alias
dispatcher.register(alias_handler)
mysubs_cmd = MySubsCommand(subscription_manager)
dispatcher.register(mysubs_cmd)
for alias in getattr(mysubs_cmd, 'aliases', []):
alias_handler = MySubsCommand(subscription_manager)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register custom commands # Register custom commands
if custom_commands: if custom_commands:
for name, response in custom_commands.items(): for name, response in custom_commands.items():

View file

@ -0,0 +1,322 @@
"""Subscription commands for scheduled reports and alerts."""
from typing import TYPE_CHECKING
from .base import CommandContext, CommandHandler
if TYPE_CHECKING:
from ..mesh_data_store import MeshDataStore
from ..mesh_reporter import MeshReporter
from ..subscriptions import SubscriptionManager
class SubCommand(CommandHandler):
"""Subscribe to scheduled reports or alerts."""
name = "sub"
description = "Subscribe to reports or alerts"
usage = "!sub daily|weekly|alerts [time] [day] [scope]"
aliases = ["subscribe"]
def __init__(
self,
subscription_manager: "SubscriptionManager" = None,
mesh_reporter: "MeshReporter" = None,
data_store: "MeshDataStore" = None,
):
self._sub_manager = subscription_manager
self._reporter = mesh_reporter
self._data_store = data_store
async def execute(self, args: str, context: CommandContext) -> str:
"""Handle subscription command."""
if not self._sub_manager:
return "Subscriptions not available."
parts = args.strip().split()
if not parts:
return self._usage_help()
sub_type = parts[0].lower()
if sub_type not in ("daily", "weekly", "alerts"):
return f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts"
try:
if sub_type == "daily":
return self._handle_daily(parts[1:], context)
elif sub_type == "weekly":
return self._handle_weekly(parts[1:], context)
else: # alerts
return self._handle_alerts(parts[1:], context)
except ValueError as e:
return f"Error: {e}"
def _usage_help(self) -> str:
"""Return usage help."""
return """Usage:
!sub daily 1830 - daily mesh report at 6:30 PM
!sub daily 1830 region SCID - daily region report
!sub daily 1830 node MHR - daily node report
!sub weekly 0800 sun - weekly digest Sunday 8 AM
!sub alerts - mesh-wide alerts
!sub alerts region SCID - alerts for a region"""
def _handle_daily(self, args: list, context: CommandContext) -> str:
"""Handle daily subscription."""
if not args:
raise ValueError("Time required. Example: !sub daily 1830")
schedule_time = args[0]
scope_type, scope_value = self._parse_scope(args[1:])
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add(
user_id=self._get_user_id(context),
sub_type="daily",
schedule_time=schedule_time,
scope_type=scope_type,
scope_value=scope_value,
)
time_fmt = self._format_time(schedule_time)
scope_desc = self._format_scope(scope_type, scope_value)
return f"Subscribed: daily {scope_desc}report at {time_fmt}"
def _handle_weekly(self, args: list, context: CommandContext) -> str:
"""Handle weekly subscription."""
if len(args) < 2:
raise ValueError("Time and day required. Example: !sub weekly 0800 sun")
schedule_time = args[0]
schedule_day = args[1].lower()
scope_type, scope_value = self._parse_scope(args[2:])
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add(
user_id=self._get_user_id(context),
sub_type="weekly",
schedule_time=schedule_time,
schedule_day=schedule_day,
scope_type=scope_type,
scope_value=scope_value,
)
time_fmt = self._format_time(schedule_time)
day_fmt = schedule_day.capitalize()
scope_desc = self._format_scope(scope_type, scope_value)
return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}"
def _handle_alerts(self, args: list, context: CommandContext) -> str:
"""Handle alerts subscription."""
scope_type, scope_value = self._parse_scope(args)
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add(
user_id=self._get_user_id(context),
sub_type="alerts",
scope_type=scope_type,
scope_value=scope_value,
)
scope_desc = self._format_scope(scope_type, scope_value)
return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}"
def _parse_scope(self, args: list) -> tuple[str, str]:
"""Parse scope from remaining args.
Returns:
(scope_type, scope_value) tuple
"""
if not args:
return "mesh", None
# Look for 'region' or 'node' keyword
scope_type = "mesh"
scope_value = None
for i, arg in enumerate(args):
arg_lower = arg.lower()
if arg_lower == "region":
scope_type = "region"
# Everything after 'region' is the region name
scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None
break
elif arg_lower == "node":
scope_type = "node"
# Next arg is the node identifier
scope_value = args[i + 1] if i + 1 < len(args) else None
break
return scope_type, scope_value
def _validate_scope(self, scope_type: str, scope_value: str) -> str:
"""Validate and resolve scope value.
Returns:
Resolved scope_value (e.g., full region name)
Raises:
ValueError: If scope not found
"""
if scope_type == "mesh":
return None
if not scope_value:
raise ValueError(f"Missing {scope_type} name")
if scope_type == "region" and self._reporter:
region = self._reporter._find_region(scope_value)
if not region:
# List available regions
health = self._reporter.health_engine.mesh_health
if health:
available = [r.name for r in health.regions if r.node_ids]
return scope_value # Use as-is, will fail at delivery if invalid
raise ValueError(f"Region '{scope_value}' not found")
return region.name # Return canonical name
if scope_type == "node" and self._reporter:
node = self._reporter._find_node(scope_value)
if not node:
raise ValueError(f"Node '{scope_value}' not found")
return node.short_name or str(node.node_num)
return scope_value
def _get_user_id(self, context: CommandContext) -> str:
"""Extract user ID from context."""
# sender_id is like "!abcd1234" - convert to node_num
sender_id = context.sender_id
if sender_id.startswith("!"):
return str(int(sender_id[1:], 16))
return sender_id
def _format_time(self, hhmm: str) -> str:
"""Format HHMM as readable time."""
hours = int(hhmm[:2])
minutes = int(hhmm[2:])
period = "AM" if hours < 12 else "PM"
display_hour = hours % 12 or 12
return f"{display_hour}:{minutes:02d} {period}"
def _format_scope(self, scope_type: str, scope_value: str) -> str:
"""Format scope for display."""
if scope_type == "mesh" or not scope_value:
return "mesh "
return f"{scope_type} {scope_value} "
class UnsubCommand(CommandHandler):
"""Unsubscribe from reports or alerts."""
name = "unsub"
description = "Remove subscription(s)"
usage = "!unsub daily|weekly|alerts|all"
aliases = ["unsubscribe"]
def __init__(self, subscription_manager: "SubscriptionManager" = None):
self._sub_manager = subscription_manager
async def execute(self, args: str, context: CommandContext) -> str:
"""Handle unsubscribe command."""
if not self._sub_manager:
return "Subscriptions not available."
sub_type = args.strip().lower() if args else None
if not sub_type:
return "Usage: !unsub daily|weekly|alerts|all"
if sub_type not in ("daily", "weekly", "alerts", "all"):
return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, or all"
user_id = self._get_user_id(context)
removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None)
if removed == 0:
return "No subscriptions found to remove"
elif sub_type == "all":
return f"Removed all {removed} subscription(s)"
else:
return f"Removed {removed} {sub_type} subscription(s)"
def _get_user_id(self, context: CommandContext) -> str:
"""Extract user ID from context."""
sender_id = context.sender_id
if sender_id.startswith("!"):
return str(int(sender_id[1:], 16))
return sender_id
class MySubsCommand(CommandHandler):
"""List active subscriptions."""
name = "mysubs"
description = "List your subscriptions"
usage = "!mysubs"
aliases = ["subs"]
def __init__(self, subscription_manager: "SubscriptionManager" = None):
self._sub_manager = subscription_manager
async def execute(self, args: str, context: CommandContext) -> str:
"""List user's subscriptions."""
if not self._sub_manager:
return "Subscriptions not available."
user_id = self._get_user_id(context)
subs = self._sub_manager.get_user_subs(user_id)
if not subs:
return "No active subscriptions. Use !sub to subscribe."
lines = ["Your subscriptions:"]
for i, sub in enumerate(subs, 1):
lines.append(f" {i}. {self._format_sub(sub)}")
return "\n".join(lines)
def _format_sub(self, sub: dict) -> str:
"""Format a subscription for display."""
sub_type = sub["sub_type"]
scope_type = sub.get("scope_type", "mesh")
scope_value = sub.get("scope_value")
scope_desc = ""
if scope_type == "region" and scope_value:
scope_desc = f"region {scope_value} "
elif scope_type == "node" and scope_value:
scope_desc = f"node {scope_value} "
if sub_type == "daily":
time_str = self._format_time(sub.get("schedule_time", "0000"))
return f"Daily {scope_desc}report at {time_str}"
elif sub_type == "weekly":
time_str = self._format_time(sub.get("schedule_time", "0000"))
day_str = (sub.get("schedule_day") or "").capitalize()
return f"Weekly {scope_desc}report at {time_str} {day_str}"
else: # alerts
return f"Alerts for {scope_desc.strip() or 'mesh'}"
def _format_time(self, hhmm: str) -> str:
"""Format HHMM as readable time."""
if not hhmm or len(hhmm) != 4:
return hhmm
hours = int(hhmm[:2])
minutes = int(hhmm[2:])
period = "AM" if hours < 12 else "PM"
display_hour = hours % 12 or 12
return f"{display_hour}:{minutes:02d} {period}"
def _get_user_id(self, context: CommandContext) -> str:
"""Extract user ID from context."""
sender_id = context.sender_id
if sender_id.startswith("!"):
return str(int(sender_id[1:], 16))
return sender_id

View file

@ -39,9 +39,11 @@ class MeshAI:
self.context: Optional[MeshContext] = None self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None self.meshmonitor_sync = None
self.knowledge = None self.knowledge = None
self.source_manager = None self.data_store = None # Replaces source_manager
self.health_engine = None self.health_engine = None
self.mesh_reporter = None self.mesh_reporter = None
self.subscription_manager = None
self._last_sub_check: float = 0.0
self.router: Optional[MessageRouter] = None self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None self.responder: Optional[Responder] = None
self._running = False self._running = False
@ -83,14 +85,20 @@ class MeshAI:
if self.meshmonitor_sync: if self.meshmonitor_sync:
self.meshmonitor_sync.maybe_refresh() self.meshmonitor_sync.maybe_refresh()
# Periodic mesh source refresh and health computation # Periodic data store refresh and health computation
if self.source_manager: if self.data_store:
refreshed = self.source_manager.refresh_all() refreshed = self.data_store.refresh()
# Recompute health after source refresh # Recompute health after refresh
if refreshed > 0 and self.health_engine: if refreshed and self.health_engine:
self.health_engine.compute(self.source_manager) self.health_engine.compute(self.data_store)
self._last_health_compute = time.time() self._last_health_compute = time.time()
# Check scheduled subscriptions (every 60 seconds)
if self.subscription_manager and self.mesh_reporter:
if time.time() - self._last_sub_check >= 60:
await self._check_scheduled_subs()
self._last_sub_check = time.time()
# Periodic cleanup # Periodic cleanup
if time.time() - self._last_cleanup >= 3600: if time.time() - self._last_cleanup >= 3600:
await self.history.cleanup_expired() await self.history.cleanup_expired()
@ -113,6 +121,10 @@ class MeshAI:
await self.llm.close() await self.llm.close()
if self.knowledge: if self.knowledge:
self.knowledge.close() self.knowledge.close()
if self.data_store:
self.data_store.close()
if self.subscription_manager:
self.subscription_manager.close()
self._remove_pid() self._remove_pid()
logger.info("MeshAI stopped") logger.info("MeshAI stopped")
@ -184,15 +196,22 @@ class MeshAI:
else: else:
self.meshmonitor_sync = None self.meshmonitor_sync = None
# Mesh data sources # Mesh data store (replaces MeshSourceManager)
enabled_sources = [s for s in self.config.mesh_sources if s.enabled] # mesh_sources may be dicts or MeshSourceConfig objects depending on config version
enabled_sources = [
s for s in self.config.mesh_sources
if (s.enabled if hasattr(s, 'enabled') else s.get('enabled', True))
]
if enabled_sources: if enabled_sources:
from .mesh_sources import MeshSourceManager from .mesh_data_store import MeshDataStore
self.source_manager = MeshSourceManager(enabled_sources) self.data_store = MeshDataStore(
# Initial fetch source_configs=enabled_sources,
self.source_manager.refresh_all() db_path="/data/mesh_history.db",
)
# Initial fetch and backfill
self.data_store.force_refresh()
# Log status # Log status
for status in self.source_manager.get_status(): for status in self.data_store.get_status():
if status["is_loaded"]: if status["is_loaded"]:
logger.info( logger.info(
f"Mesh source '{status['name']}' ({status['type']}): " f"Mesh source '{status['name']}' ({status['type']}): "
@ -204,11 +223,11 @@ class MeshAI:
f"failed - {status.get('last_error', 'unknown error')}" f"failed - {status.get('last_error', 'unknown error')}"
) )
else: else:
self.source_manager = None self.data_store = None
# Mesh health engine # Mesh health engine
mi_cfg = self.config.mesh_intelligence mi_cfg = self.config.mesh_intelligence
if mi_cfg.enabled and self.source_manager: if mi_cfg.enabled and self.data_store:
from .mesh_health import MeshHealthEngine from .mesh_health import MeshHealthEngine
self.health_engine = MeshHealthEngine( self.health_engine = MeshHealthEngine(
regions=mi_cfg.regions, regions=mi_cfg.regions,
@ -218,7 +237,7 @@ class MeshAI:
battery_warning_percent=mi_cfg.battery_warning_percent, battery_warning_percent=mi_cfg.battery_warning_percent,
) )
# Initial health computation # Initial health computation
mesh_health = self.health_engine.compute(self.source_manager) mesh_health = self.health_engine.compute(self.data_store)
self._last_health_compute = time.time() self._last_health_compute = time.time()
logger.info( logger.info(
f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, "
@ -229,21 +248,33 @@ class MeshAI:
self.health_engine = None self.health_engine = None
# Mesh reporter (for LLM prompt injection and commands) # Mesh reporter (for LLM prompt injection and commands)
if self.health_engine and self.source_manager: if self.health_engine and self.data_store:
from .mesh_reporter import MeshReporter from .mesh_reporter import MeshReporter
self.mesh_reporter = MeshReporter(self.health_engine, self.source_manager) self.mesh_reporter = MeshReporter(self.health_engine, self.data_store)
logger.info("Mesh reporter enabled") logger.info("Mesh reporter enabled")
else: else:
self.mesh_reporter = None self.mesh_reporter = None
# Knowledge base # Subscription manager (uses same db as data_store)
if self.data_store:
from .subscriptions import SubscriptionManager
self.subscription_manager = SubscriptionManager(db_path="/data/mesh_history.db")
logger.info("Subscription manager enabled")
else:
self.subscription_manager = None
# Knowledge base (optional - gracefully degrade if deps missing)
kb_cfg = self.config.knowledge kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path: if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch try:
self.knowledge = KnowledgeSearch( from .knowledge import KnowledgeSearch
db_path=kb_cfg.db_path, self.knowledge = KnowledgeSearch(
top_k=kb_cfg.top_k, db_path=kb_cfg.db_path,
) top_k=kb_cfg.top_k,
)
except ImportError as e:
logger.warning(f"Knowledge base disabled - missing dependencies: {e}")
self.knowledge = None
else: else:
self.knowledge = None self.knowledge = None
@ -253,6 +284,9 @@ class MeshAI:
disabled_commands=self.config.commands.disabled_commands, disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands, custom_commands=self.config.commands.custom_commands,
mesh_reporter=self.mesh_reporter, mesh_reporter=self.mesh_reporter,
data_store=self.data_store,
health_engine=self.health_engine,
subscription_manager=self.subscription_manager,
) )
# Message router # Message router
@ -261,7 +295,7 @@ class MeshAI:
context=self.context, context=self.context,
meshmonitor_sync=self.meshmonitor_sync, meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge, knowledge=self.knowledge,
source_manager=self.source_manager, data_store=self.data_store,
health_engine=self.health_engine, health_engine=self.health_engine,
mesh_reporter=self.mesh_reporter, mesh_reporter=self.mesh_reporter,
) )
@ -373,6 +407,80 @@ class MeshAI:
if pid_file.exists(): if pid_file.exists():
pid_file.unlink() pid_file.unlink()
async def _check_scheduled_subs(self) -> None:
"""Check for and deliver due scheduled reports."""
from datetime import datetime
from zoneinfo import ZoneInfo
tz = ZoneInfo("America/Boise")
now = datetime.now(tz)
current_hhmm = now.strftime("%H%M")
current_day = now.strftime("%a").lower()
due_subs = self.subscription_manager.get_due_subscriptions(current_hhmm, current_day)
for sub in due_subs:
try:
# Generate report based on scope
report = self._generate_sub_report(sub)
if not report:
continue
# Send DM to subscriber
user_id = sub["user_id"]
await self._send_sub_dm(user_id, report)
# Mark as sent
self.subscription_manager.mark_sent(sub["id"])
logger.info(f"Delivered {sub['sub_type']} report to {user_id}")
except Exception as e:
logger.error(f"Error delivering subscription {sub['id']}: {e}")
def _generate_sub_report(self, sub: dict) -> str:
"""Generate report content for a subscription."""
if not self.mesh_reporter:
return None
sub_type = sub["sub_type"]
scope_type = sub.get("scope_type", "mesh")
scope_value = sub.get("scope_value")
if scope_type == "region" and scope_value:
# Region-scoped report
region = self.mesh_reporter._find_region(scope_value)
if region:
return self.mesh_reporter.build_region_compact(region.name)
return None
elif scope_type == "node" and scope_value:
# Node-scoped report
return self.mesh_reporter.build_node_compact(scope_value)
else:
# Mesh-wide report
return self.mesh_reporter.build_lora_compact(scope="mesh")
async def _send_sub_dm(self, node_num: str, message: str) -> None:
"""Send a subscription DM to a node."""
if not self.connector:
return
# Convert node_num to destination format
try:
dest = int(node_num)
except ValueError:
dest = node_num
# Send via responder for proper chunking
if self.responder:
await self.responder.send_response(
message,
destination=dest,
channel=0, # DM channel
)
else:
# Fallback to direct send
self.connector.send_message(message, destination=dest)
def setup_logging(verbose: bool = False) -> None: def setup_logging(verbose: bool = False) -> None:
"""Configure logging.""" """Configure logging."""

File diff suppressed because it is too large Load diff

278
meshai/subscriptions.py Normal file
View file

@ -0,0 +1,278 @@
"""Subscription management for scheduled reports and alerts."""
import logging
import sqlite3
import time
from typing import Optional
logger = logging.getLogger(__name__)
# Valid subscription types
VALID_SUB_TYPES = {"daily", "weekly", "alerts"}
VALID_DAYS = {"mon", "tue", "wed", "thu", "fri", "sat", "sun"}
VALID_SCOPE_TYPES = {"mesh", "region", "node"}
class SubscriptionManager:
"""Manages user subscriptions with SQLite storage."""
def __init__(self, db_path: str):
"""Initialize subscription manager.
Args:
db_path: Path to SQLite database (same as mesh_history.db)
"""
self._db_path = db_path
self._db: Optional[sqlite3.Connection] = None
self._init_db()
def _init_db(self):
"""Initialize database connection and schema."""
self._db = sqlite3.connect(self._db_path, check_same_thread=False)
self._db.row_factory = sqlite3.Row
self._db.executescript("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
sub_type TEXT NOT NULL,
schedule_time TEXT,
schedule_day TEXT,
scope_type TEXT DEFAULT 'mesh',
scope_value TEXT,
created_at REAL NOT NULL,
last_sent REAL DEFAULT 0,
enabled INTEGER DEFAULT 1
);
CREATE INDEX IF NOT EXISTS idx_sub_user ON subscriptions(user_id);
CREATE INDEX IF NOT EXISTS idx_sub_type ON subscriptions(sub_type);
""")
self._db.commit()
logger.info("Subscription manager initialized")
def _row_to_dict(self, row: sqlite3.Row) -> dict:
"""Convert sqlite Row to dict."""
return dict(row)
def add(self, user_id: str, sub_type: str, schedule_time: str = None,
schedule_day: str = None, scope_type: str = "mesh",
scope_value: str = None) -> dict:
"""Add a subscription.
Args:
user_id: Subscriber node_num
sub_type: "daily", "weekly", or "alerts"
schedule_time: HHMM format (required for daily/weekly)
schedule_day: mon-sun (required for weekly)
scope_type: "mesh", "region", or "node"
scope_value: Region name or node identifier
Returns:
Created subscription dict
Raises:
ValueError: If validation fails
"""
# Validate sub_type
if sub_type not in VALID_SUB_TYPES:
raise ValueError(f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts")
# Validate schedule_time for daily/weekly
if sub_type in ("daily", "weekly"):
if not schedule_time:
raise ValueError(f"Time required for {sub_type} subscription. Use HHMM format (e.g., 1830)")
if not self._validate_time(schedule_time):
raise ValueError("Invalid time format. Use HHMM (e.g., 1830 for 6:30 PM)")
# Validate schedule_day for weekly
if sub_type == "weekly":
if not schedule_day:
raise ValueError("Day required for weekly subscription. Use: mon, tue, wed, thu, fri, sat, sun")
if schedule_day.lower() not in VALID_DAYS:
raise ValueError("Invalid day. Use: mon, tue, wed, thu, fri, sat, sun")
schedule_day = schedule_day.lower()
# Validate scope_type
if scope_type not in VALID_SCOPE_TYPES:
raise ValueError(f"Invalid scope '{scope_type}'. Use: mesh, region, or node")
# Check for duplicates
existing = self._db.execute("""
SELECT id FROM subscriptions
WHERE user_id = ? AND sub_type = ? AND scope_type = ?
AND (scope_value = ? OR (scope_value IS NULL AND ? IS NULL))
AND enabled = 1
""", (user_id, sub_type, scope_type, scope_value, scope_value)).fetchone()
if existing:
scope_desc = f" for {scope_type} {scope_value}" if scope_value else ""
raise ValueError(f"Already subscribed to {sub_type}{scope_desc}")
# Insert subscription
now = time.time()
cursor = self._db.execute("""
INSERT INTO subscriptions (user_id, sub_type, schedule_time, schedule_day,
scope_type, scope_value, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (user_id, sub_type, schedule_time, schedule_day, scope_type, scope_value, now))
self._db.commit()
sub_id = cursor.lastrowid
return self._get_by_id(sub_id)
def _validate_time(self, time_str: str) -> bool:
"""Validate HHMM time format."""
if not time_str or len(time_str) != 4 or not time_str.isdigit():
return False
hours = int(time_str[:2])
minutes = int(time_str[2:])
return 0 <= hours <= 23 and 0 <= minutes <= 59
def _get_by_id(self, sub_id: int) -> dict:
"""Get subscription by ID."""
row = self._db.execute(
"SELECT * FROM subscriptions WHERE id = ?", (sub_id,)
).fetchone()
return self._row_to_dict(row) if row else None
def remove(self, user_id: str, sub_type: str = None) -> int:
"""Remove subscription(s).
Args:
user_id: Subscriber node_num
sub_type: "daily", "weekly", "alerts", or None for all
Returns:
Number of subscriptions removed
"""
if sub_type and sub_type != "all":
cursor = self._db.execute(
"DELETE FROM subscriptions WHERE user_id = ? AND sub_type = ?",
(user_id, sub_type)
)
else:
cursor = self._db.execute(
"DELETE FROM subscriptions WHERE user_id = ?",
(user_id,)
)
self._db.commit()
return cursor.rowcount
def get_user_subs(self, user_id: str) -> list[dict]:
"""Get all subscriptions for a user."""
rows = self._db.execute(
"SELECT * FROM subscriptions WHERE user_id = ? AND enabled = 1 ORDER BY created_at",
(user_id,)
).fetchall()
return [self._row_to_dict(r) for r in rows]
def get_due_subscriptions(self, current_time_hhmm: str, current_day: str) -> list[dict]:
"""Get subscriptions that should fire right now.
Args:
current_time_hhmm: Current time as "HHMM" (e.g., "1830")
current_day: Current day as 3-letter lowercase (e.g., "sun")
Returns:
List of subscription dicts that are due
"""
now = time.time()
due = []
# Get all daily/weekly subscriptions
rows = self._db.execute("""
SELECT * FROM subscriptions
WHERE sub_type IN ('daily', 'weekly') AND enabled = 1
""").fetchall()
current_minutes = int(current_time_hhmm[:2]) * 60 + int(current_time_hhmm[2:])
for row in rows:
sub = self._row_to_dict(row)
schedule_time = sub.get("schedule_time")
if not schedule_time:
continue
schedule_minutes = int(schedule_time[:2]) * 60 + int(schedule_time[2:])
# 5-minute matching window
if abs(schedule_minutes - current_minutes) > 5:
continue
sub_type = sub["sub_type"]
last_sent = sub.get("last_sent", 0) or 0
if sub_type == "daily":
# Don't fire if sent within last 23 hours
if now - last_sent < 23 * 3600:
continue
due.append(sub)
elif sub_type == "weekly":
# Check day matches
schedule_day = sub.get("schedule_day", "").lower()
if schedule_day != current_day.lower():
continue
# Don't fire if sent within last 6 days
if now - last_sent < 6 * 24 * 3600:
continue
due.append(sub)
return due
def get_alert_subscribers(self, scope_type: str = None, scope_value: str = None) -> list[dict]:
"""Get users subscribed to alerts matching a scope.
Args:
scope_type: "mesh", "region", or "node"
scope_value: Region name or node identifier
Returns:
List of subscription dicts where scope matches
"""
# Get all alert subscriptions
rows = self._db.execute("""
SELECT * FROM subscriptions
WHERE sub_type = 'alerts' AND enabled = 1
""").fetchall()
matching = []
for row in rows:
sub = self._row_to_dict(row)
sub_scope = sub.get("scope_type", "mesh")
sub_value = sub.get("scope_value")
# Mesh scope gets ALL alerts
if sub_scope == "mesh":
matching.append(sub)
# Region scope gets alerts for that region
elif sub_scope == "region" and scope_type == "region":
if sub_value and scope_value and sub_value.lower() == scope_value.lower():
matching.append(sub)
# Node scope gets alerts for that node
elif sub_scope == "node" and scope_type == "node":
if sub_value and scope_value and sub_value.lower() == scope_value.lower():
matching.append(sub)
return matching
def mark_sent(self, subscription_id: int):
"""Update last_sent timestamp to now."""
self._db.execute(
"UPDATE subscriptions SET last_sent = ? WHERE id = ?",
(time.time(), subscription_id)
)
self._db.commit()
def get_all_subs(self) -> list[dict]:
"""Get all subscriptions (for admin view)."""
rows = self._db.execute(
"SELECT * FROM subscriptions WHERE enabled = 1 ORDER BY user_id, created_at"
).fetchall()
return [self._row_to_dict(r) for r in rows]
def close(self):
"""Close database connection."""
if self._db:
self._db.close()
self._db = None