fix: Scope detection, follow-up context, utilization calculation, duplicate disambiguation

- router.py: Fixed region scope detection to match longest region name first
- router.py: Added region abbreviations (SCID, SWID, etc.) for quick matching
- router.py: Added city name mapping (Boise -> South Western ID, etc.)
- router.py: Fixed node longname matching (case-insensitive substring)
- router.py: Added follow-up message context tracking (_user_mesh_context)
- router.py: Added more mesh keywords (noisy, traffic, packets, etc.)
- mesh_reporter.py: Added disambiguation for duplicate shortnames in region detail
- mesh_health.py: Added util_data_available flag to track packet data presence
- mesh_health.py: Passes has_packet_data through score computation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-04 20:56:54 +00:00
commit df197cc395
3 changed files with 301 additions and 51 deletions

View file

@ -59,6 +59,9 @@ class HealthScore:
battery_warnings: int = 0
solar_index: float = 100.0
# Flag to indicate if utilization data is available
util_data_available: bool = False
@property
def composite(self) -> float:
"""Calculate weighted composite score."""
@ -251,7 +254,7 @@ class MeshHealthEngine:
all_telemetry = source_manager.get_all_telemetry()
all_packets = []
# Get packets from MeshMonitor sources
# Get packets from MeshMonitor sources (if available)
for status in source_manager.get_status():
if status["type"] == "meshmonitor":
src = source_manager.get_source(status["name"])
@ -261,6 +264,9 @@ class MeshHealthEngine:
tagged["_source"] = status["name"]
all_packets.append(tagged)
# Track if we have packet data for utilization calculation
has_packet_data = len(all_packets) > 0
# Build node health records
nodes: dict[str, NodeHealth] = {}
for node in all_nodes:
@ -486,10 +492,10 @@ class MeshHealthEngine:
if n["id"] in nodes:
nodes[n["id"]].locality = locality.name
# Compute scores at each level
self._compute_locality_scores(regions, nodes)
self._compute_region_scores(regions, nodes)
mesh_score = self._compute_mesh_score(regions, nodes)
# Compute scores at each level (pass packet data availability flag)
self._compute_locality_scores(regions, nodes, has_packet_data)
self._compute_region_scores(regions, nodes, has_packet_data)
mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data)
# Build result
mesh_health = MeshHealth(
@ -512,37 +518,45 @@ class MeshHealthEngine:
self,
regions: list[RegionHealth],
nodes: dict[str, NodeHealth],
has_packet_data: bool = False,
) -> None:
"""Compute health scores for each locality."""
for region in regions:
for locality in region.localities:
locality_nodes = [nodes[nid] for nid in locality.node_ids if nid in nodes]
locality.score = self._compute_node_group_score(locality_nodes)
locality.score = self._compute_node_group_score(locality_nodes, has_packet_data)
def _compute_region_scores(
self,
regions: list[RegionHealth],
nodes: dict[str, NodeHealth],
has_packet_data: bool = False,
) -> None:
"""Compute health scores for each region."""
for region in regions:
region_nodes = [nodes[nid] for nid in region.node_ids if nid in nodes]
region.score = self._compute_node_group_score(region_nodes)
region.score = self._compute_node_group_score(region_nodes, has_packet_data)
def _compute_mesh_score(
self,
regions: list[RegionHealth],
nodes: dict[str, NodeHealth],
has_packet_data: bool = False,
) -> HealthScore:
"""Compute mesh-wide health score."""
all_nodes = list(nodes.values())
return self._compute_node_group_score(all_nodes)
return self._compute_node_group_score(all_nodes, has_packet_data)
def _compute_node_group_score(self, node_list: list[NodeHealth]) -> HealthScore:
def _compute_node_group_score(
self,
node_list: list[NodeHealth],
has_packet_data: bool = False,
) -> HealthScore:
"""Compute health score for a group of nodes.
Args:
node_list: List of NodeHealth objects
has_packet_data: Whether packet data is available for utilization calc
Returns:
HealthScore for the group
@ -560,7 +574,8 @@ class MeshHealthEngine:
else:
infra_score = 100.0 # No infrastructure = not penalized
# Channel utilization (simplified - based on packet counts)
# Channel utilization (based on packet counts if available)
if has_packet_data:
total_packets = sum(n.packet_count_24h for n in node_list)
baseline = len(node_list) * 500
if baseline > 0:
@ -578,6 +593,11 @@ class MeshHealthEngine:
util_score = 25.0
else:
util_score = 0.0
else:
# No packet data available - assume healthy utilization
# This prevents penalizing the score when we simply don't have data
util_percent = 0.0
util_score = 100.0
# Node behavior (flagged nodes)
flagged = [n for n in node_list if n.non_text_packets > self.packet_threshold]
@ -622,6 +642,7 @@ class MeshHealthEngine:
flagged_nodes=flagged_count,
battery_warnings=battery_warnings,
solar_index=solar_index,
util_data_available=has_packet_data,
)
def get_region(self, name: str) -> Optional[RegionHealth]:
@ -675,3 +696,4 @@ class MeshHealthEngine:
n for n in self._mesh_health.nodes.values()
if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent
]

View file

@ -70,7 +70,10 @@ class MeshReporter:
# Utilization
util = score.util_percent
if util < 15:
util_data_available = getattr(score, 'util_data_available', False)
if not util_data_available:
util_label = "N/A - no packet data"
elif util < 15:
util_label = "Low"
elif util < 20:
util_label = "Moderate"
@ -183,16 +186,36 @@ class MeshReporter:
f"Infrastructure ({rs.infra_online}/{rs.infra_total}):",
]
# List infrastructure nodes
# Collect infrastructure nodes and detect duplicate shortnames
infra_nodes = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if not node or not node.is_infrastructure:
continue
if node and node.is_infrastructure:
infra_nodes.append((nid, node))
# Count shortname occurrences to detect duplicates
shortname_counts: dict[str, int] = {}
for nid, node in infra_nodes:
sn = node.short_name or nid[:4]
shortname_counts[sn] = shortname_counts.get(sn, 0) + 1
# List infrastructure nodes with disambiguation for duplicates
for nid, node in infra_nodes:
status = "+" if node.is_online else "X"
age = _format_age(node.last_seen)
bat = f", bat {node.battery_percent:.0f}%" if node.battery_percent else ""
role = node.role or "ROUTER"
lines.append(f" {status} {node.short_name or nid[:4]} ({role}) - last seen {age}{bat}")
sn = node.short_name or nid[:4]
# Disambiguate duplicate shortnames with node ID suffix
if shortname_counts.get(sn, 0) > 1:
# Use last 4 chars of node_id as disambiguator
disambig = f", !{nid[-8:]}" if len(nid) >= 8 else f", {nid}"
name_str = f"{sn} ({role}{disambig})"
else:
name_str = f"{sn} ({role})"
lines.append(f" {status} {name_str} - last seen {age}{bat}")
if not node.is_online:
lines[-1] += " <- OFFLINE"
@ -543,3 +566,4 @@ class MeshReporter:
lines.append(f" {region.name}: {s.composite:.0f}/100{flag}")
return "\n".join(lines)

View file

@ -60,6 +60,9 @@ _MESH_KEYWORDS = {
"hop", "optimize", "optimization", "infrastructure", "infra", "relay",
"repeater", "region", "locality", "congestion", "collision", "airtime",
"telemetry", "firmware", "subscribe", "alert", "snr", "rssi",
# Additional keywords for better detection
"noisy", "noisiest", "traffic", "packets", "power", "routers",
"repeaters", "regions", "localities", "score", "status",
}
# Phrases that indicate mesh questions
@ -73,8 +76,53 @@ _MESH_PHRASES = [
"node status",
"network health",
"mesh health",
"which node",
"which nodes",
"which infra",
"list nodes",
"list infra",
"tell me about",
"what about",
"how is",
"how are",
]
# City name to region mapping (hardcoded fallback)
_CITY_TO_REGION = {
# Idaho
"twin falls": "South Central ID",
"boise": "South Western ID",
"nampa": "South Western ID",
"meridian": "South Western ID",
"caldwell": "South Western ID",
"idaho falls": "South Eastern ID",
"pocatello": "South Eastern ID",
"coeur d'alene": "Northern ID",
"cda": "Northern ID",
"post falls": "Northern ID",
"moscow": "Northern ID",
"lewiston": "Northern ID",
"salmon": "Central ID",
"sun valley": "Central ID",
"ketchum": "Central ID",
# Utah
"ogden": "Northern UT",
"logan": "Northern UT",
"salt lake": "Central UT",
"salt lake city": "Central UT",
"slc": "Central UT",
"provo": "Central UT",
"orem": "Central UT",
"vernal": "Eastern UT",
"moab": "Eastern UT",
"price": "Eastern UT",
"tooele": "Western UT",
"wendover": "Western UT",
"st george": "Southern UT",
"st. george": "Southern UT",
"cedar city": "Southern UT",
}
# Mesh awareness instruction for LLM
_MESH_AWARENESS_PROMPT = """
When the user asks about mesh health, network status, or optimization:
@ -88,6 +136,46 @@ When the user asks about mesh health, network status, or optimization:
"""
def _build_region_abbreviations(region_names: list[str]) -> dict[str, str]:
"""Build abbreviation to region name mapping.
Generates abbreviations like:
- "South Central ID" -> "SCID", "SC-ID", "SC ID"
- "South Western ID" -> "SWID", "SW-ID", "SW ID"
Args:
region_names: List of full region names
Returns:
Dict mapping lowercase abbreviation to full region name
"""
abbrevs = {}
for name in region_names:
parts = name.replace("???", "-").replace("???", "-").split()
if not parts:
continue
# Get first letter of each word (uppercase)
initials = "".join(p[0].upper() for p in parts if p)
abbrevs[initials.lower()] = name
# If last part is a state abbrev (2 chars), create variants
if len(parts) >= 2:
last = parts[-1]
if len(last) == 2 and last.isupper():
# "South Central ID" -> prefix is "South Central"
prefix_parts = parts[:-1]
prefix_initials = "".join(p[0].upper() for p in prefix_parts)
# SC-ID, SC ID, SCID variants
abbrevs[f"{prefix_initials.lower()}-{last.lower()}"] = name
abbrevs[f"{prefix_initials.lower()} {last.lower()}"] = name
abbrevs[f"{prefix_initials.lower()}{last.lower()}"] = name
return abbrevs
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
@ -118,6 +206,17 @@ class MessageRouter:
self.mesh_reporter = mesh_reporter
self.continuations = ContinuationState(max_continuations=3)
# Per-user mesh context tracking for follow-up handling
# Maps user_id -> {"last_was_mesh": bool, "last_scope": (type, value), "non_mesh_count": int}
self._user_mesh_context: dict[str, dict] = {}
# Build region abbreviation map
self._region_abbrevs: dict[str, str] = {}
if self.health_engine and self.health_engine.regions:
region_names = [r.name for r in self.health_engine.regions]
self._region_abbrevs = _build_region_abbreviations(region_names)
logger.debug(f"Built region abbreviations: {self._region_abbrevs}")
def should_respond(self, message: MeshMessage) -> bool:
"""Determine if we should respond to this message.
@ -241,38 +340,122 @@ class MessageRouter:
"""
msg_lower = message.lower()
# Check for node references
# === NODE MATCHING (check first - more specific) ===
if self.health_engine and self.health_engine.mesh_health:
health = self.health_engine.mesh_health
# Look for node shortnames (4 chars, case-insensitive)
# 1. Exact shortname match (case-insensitive, word boundary)
for node in health.nodes.values():
if node.short_name:
# Check if shortname appears as a word in message
pattern = r'\b' + re.escape(node.short_name.lower()) + r'\b'
if re.search(pattern, msg_lower):
return ("node", node.short_name)
# Check longname substring
if node.long_name and node.long_name.lower() in msg_lower:
# 2. Longname substring match (case-insensitive)
for node in health.nodes.values():
if node.long_name and len(node.long_name) > 3:
# Match significant portion of longname
if node.long_name.lower() in msg_lower:
return ("node", node.short_name or node.node_id)
# Also try matching without common suffixes like "Router", "Repeater"
clean_name = node.long_name.lower()
for suffix in [" router", " repeater", " relay", " base", " v2", " - g2"]:
clean_name = clean_name.replace(suffix, "")
if len(clean_name) > 4 and clean_name in msg_lower:
return ("node", node.short_name or node.node_id)
# Check for region references
# 3. NodeId hex match (with or without ! prefix)
hex_pattern = r'!?([0-9a-f]{8})'
hex_match = re.search(hex_pattern, msg_lower)
if hex_match:
hex_id = hex_match.group(1)
for nid, node in health.nodes.items():
if hex_id in nid.lower():
return ("node", node.short_name or nid)
# 4. NodeNum decimal match
num_pattern = r'\b(\d{9,10})\b'
num_match = re.search(num_pattern, message)
if num_match:
node_num = int(num_match.group(1))
hex_id = format(node_num, 'x')
for nid, node in health.nodes.items():
if hex_id in nid.lower():
return ("node", node.short_name or nid)
# === REGION MATCHING ===
if self.health_engine:
for anchor in self.health_engine.regions:
# 1. Check abbreviations first (SCID, SWID, etc.)
for abbrev, region_name in self._region_abbrevs.items():
# Match as word boundary
pattern = r'\b' + re.escape(abbrev) + r'\b'
if re.search(pattern, msg_lower):
return ("region", region_name)
# 2. Check city names
for city, region_name in _CITY_TO_REGION.items():
if city in msg_lower:
return ("region", region_name)
# 3. Full region name matching (SORTED BY LENGTH - longest first)
regions_by_length = sorted(
self.health_engine.regions,
key=lambda r: len(r.name),
reverse=True
)
for anchor in regions_by_length:
anchor_lower = anchor.name.lower()
# Check region name
# Check full region name
if anchor_lower in msg_lower:
return ("region", anchor.name)
# Check parts of region name (e.g., "wood river" matches "Wood River - ID")
parts = anchor_lower.replace("-", " ").replace("", " ").split()
for part in parts:
if len(part) > 3 and part in msg_lower:
# 4. Partial region name matching (also longest first)
for anchor in regions_by_length:
anchor_lower = anchor.name.lower()
# Check significant parts of region name
# Split on common separators
parts = anchor_lower.replace("-", " ").replace("???", " ").replace("???", " ").split()
# Only match on significant words (>3 chars, not state abbrevs)
significant_parts = [p for p in parts if len(p) > 3]
# Check if ALL significant parts appear in message
if significant_parts and all(p in msg_lower for p in significant_parts):
return ("region", anchor.name)
return ("mesh", None)
def _get_user_mesh_context(self, user_id: str) -> dict:
"""Get or create mesh context for a user."""
if user_id not in self._user_mesh_context:
self._user_mesh_context[user_id] = {
"last_was_mesh": False,
"last_scope": ("mesh", None),
"non_mesh_count": 0,
}
return self._user_mesh_context[user_id]
def _update_user_mesh_context(
self,
user_id: str,
is_mesh: bool,
scope: tuple[str, Optional[str]] = None,
) -> None:
"""Update mesh context tracking for a user."""
ctx = self._get_user_mesh_context(user_id)
if is_mesh:
ctx["last_was_mesh"] = True
ctx["non_mesh_count"] = 0
if scope:
ctx["last_scope"] = scope
else:
ctx["non_mesh_count"] += 1
# Reset after 2 consecutive non-mesh messages
if ctx["non_mesh_count"] >= 2:
ctx["last_was_mesh"] = False
ctx["last_scope"] = ("mesh", None)
async def generate_llm_response(self, message: MeshMessage, query: str) -> str:
"""Generate LLM response for a message.
@ -320,7 +503,7 @@ class MessageRouter:
"\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same "
"meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, "
"traceroutes, security scanning, and auto-responder commands. Its trigger "
"commands are listed below if someone asks what commands are available, "
"commands are listed below ??? if someone asks what commands are available, "
"mention both yours and MeshMonitor's. If someone asks where to get "
"MeshMonitor, direct them to github.com/Yeraze/meshmonitor"
)
@ -357,13 +540,23 @@ class MessageRouter:
)
# 6. Mesh Intelligence (inject health data for mesh questions)
if (
self.source_manager
and self.mesh_reporter
and self._is_mesh_question(query)
):
user_ctx = self._get_user_mesh_context(message.sender_id)
is_direct_mesh_question = self._is_mesh_question(query)
is_followup = user_ctx["last_was_mesh"] and not is_direct_mesh_question
should_inject_mesh = is_direct_mesh_question or is_followup
if self.source_manager and self.mesh_reporter and should_inject_mesh:
# Detect scope from current message
scope_type, scope_value = self._detect_mesh_scope(query)
# For follow-ups with no detected scope, use previous scope
if is_followup and scope_type == "mesh" and scope_value is None:
prev_scope = user_ctx.get("last_scope", ("mesh", None))
if prev_scope[0] != "mesh" or prev_scope[1] is not None:
scope_type, scope_value = prev_scope
logger.debug(f"Using previous scope for follow-up: {scope_type}, {scope_value}")
# Always include Tier 1 summary for mesh questions
tier1 = self.mesh_reporter.build_tier1_summary()
system_prompt += "\n\n" + tier1
@ -384,6 +577,16 @@ class MessageRouter:
# Add mesh awareness instructions
system_prompt += _MESH_AWARENESS_PROMPT
# Update mesh context tracking
self._update_user_mesh_context(
message.sender_id,
is_mesh=True,
scope=(scope_type, scope_value),
)
else:
# Not a mesh question
self._update_user_mesh_context(message.sender_id, is_mesh=False)
# DEBUG: Log system prompt status
logger.debug(f"System prompt length: {len(system_prompt)} chars")
@ -470,3 +673,4 @@ class MessageRouter:
connector=self.connector,
history=self.history,
)