feat: add reviewer tool, lazy loading, manifest profiles, instinct learning, and hook profiles

Features:
- reviewer_tool: Multi-language code review (Python, JS/TS, Go, Rust, Java, C++, C#, PHP)
- lazy_loader: Context-triggered skill loading with 3-tier budget system
- manifest: Profile-based skill installation (coding, debugging, planning, shipping, research)
- instinct: Learning system that tracks patterns and predicts skill loading
- hook_profile: Security hook profiles (minimal, standard, strict, developer, rust-dev)
- hook_tool + hooks: Post-write hook execution system

Code Review Tool:
- Supports 9 languages with lint, typecheck, format, test, security checks
- Auto-detects language from file extensions
- Configurable tools per language (e.g., ruff, eslint, golangci-lint)

Lazy Loading:
- CONTEXT_TO_SKILLS mapping for 20+ context triggers
- Budget-aware loading (Tier 0: core, Tier 1: context, Tier 2: rare)
- Emergency mode at >90% context usage

Integration:
- Registered in model_tools.py and toolsets.py
- 8 language reviewers mapped in lazy_loader (python, go, rust, js, java, cpp, csharp, php)
This commit is contained in:
Your Name
2026-04-14 22:40:56 +08:00
parent 16f9d02084
commit c169d35c72
11 changed files with 3679 additions and 1 deletions

525
agent/hook_profile.py Normal file
View File

@@ -0,0 +1,525 @@
"""
Hook Profile System
Manages different security profiles for command hooks.
Users can switch between profiles to adjust the level of
security enforcement.
Inspired by gstack's hook patterns (careful/freeze/guard).
Usage:
from agent.hook_profile import HookProfileManager, get_profile_manager
# Get the manager
manager = get_profile_manager()
# Get current profile
profile = manager.get_active_profile()
# Switch to strict mode
manager.switch_profile("strict")
# Check if a command would be blocked
result = manager.check_command("rm -rf /")
"""
import yaml
import logging
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
class ProfileLevel(Enum):
"""Hook security profile levels."""
MINIMAL = "minimal"
STANDARD = "standard"
STRICT = "strict"
def __str__(self):
return self.value
@dataclass
class HookDefinition:
"""Definition of a hook."""
name: str
hook_type: str # "pre-command" or "post-write"
script_path: str
enabled: bool = True
config_path: Optional[str] = None # Optional YAML config
description: str = ""
blocks_on_match: bool = False # Does this hook block commands?
@property
def full_path(self) -> Path:
"""Get the full path to the hook script."""
return Path.home() / ".hermes" / "hooks" / self.hook_type / self.script_path
@dataclass
class HookProfile:
"""A named hook profile with associated hooks."""
name: str
level: ProfileLevel
description: str
hooks: List[str] # Hook names to enable
env_overrides: Dict[str, str] = field(default_factory=dict)
auto_activate: bool = False # Auto-activate based on context
@property
def hook_definitions(self) -> List[HookDefinition]:
"""Get HookDefinition objects for this profile's hooks."""
return [BUILTIN_HOOKS.get(h) for h in self.hooks if h in BUILTIN_HOOKS]
# =============================================================================
# Built-in Hook Profiles
# =============================================================================
BUILTIN_HOOKS: Dict[str, HookDefinition] = {
"careful": HookDefinition(
name="careful",
hook_type="pre-command",
script_path="careful-hook.sh",
blocks_on_match=False,
description="Warn about dangerous commands but allow execution",
),
"freeze": HookDefinition(
name="freeze",
hook_type="pre-command",
script_path="freeze-hook.sh",
blocks_on_match=True,
description="Block dangerous commands entirely",
),
"guard": HookDefinition(
name="guard",
hook_type="pre-command",
script_path="guard-hook.sh",
blocks_on_match=False,
description="Context-aware guard with learning",
),
"rust-fmt": HookDefinition(
name="rust-fmt",
hook_type="post-write",
script_path="rust-fmt-on-rs.yaml",
blocks_on_match=False,
description="Format Rust code on save",
),
"cargo-check": HookDefinition(
name="cargo-check",
hook_type="post-write",
script_path="cargo-check-on-rs.yaml",
blocks_on_match=True,
description="Check Rust code on save",
),
"go-fmt": HookDefinition(
name="go-fmt",
hook_type="post-write",
script_path="go-fmt-on-go.yaml",
blocks_on_match=False,
description="Format Go code on save",
),
"ruff-lint": HookDefinition(
name="ruff-lint",
hook_type="post-write",
script_path="ruff-lint-py.yaml",
blocks_on_match=True,
description="Lint Python code on save with ruff",
),
"pytest": HookDefinition(
name="pytest",
hook_type="post-write",
script_path="pytest-on-py.yaml",
blocks_on_match=True,
description="Run pytest on Python file change",
),
}
BUILTIN_PROFILES: Dict[str, HookProfile] = {
"minimal": HookProfile(
name="minimal",
level=ProfileLevel.MINIMAL,
description="No security hooks - use with caution",
hooks=[], # No hooks
env_overrides={"HERMES_HOOKS_ENABLED": "false"},
),
"standard": HookProfile(
name="standard",
level=ProfileLevel.STANDARD,
description="Warning-only hooks for dangerous commands",
hooks=["careful"],
env_overrides={"HERMES_HOOKS_ENABLED": "true"},
auto_activate=False,
),
"strict": HookProfile(
name="strict",
level=ProfileLevel.STRICT,
description="Block dangerous commands - recommended for production",
hooks=["careful", "freeze"],
env_overrides={"HERMES_HOOKS_ENABLED": "true"},
auto_activate=False,
),
"developer": HookProfile(
name="developer",
level=ProfileLevel.STANDARD,
description="Standard hooks + code quality checks (Rust/Go/Python)",
hooks=["careful", "ruff-lint", "pytest", "go-fmt"],
env_overrides={"HERMES_HOOKS_ENABLED": "true"},
auto_activate=False,
),
"rust-dev": HookProfile(
name="rust-dev",
level=ProfileLevel.STANDARD,
description="Rust development profile with formatting and checks",
hooks=["careful", "rust-fmt", "cargo-check"],
env_overrides={"HERMES_HOOKS_ENABLED": "true"},
auto_activate=False,
),
}
# =============================================================================
# Hook Profile Manager
# =============================================================================
class HookProfileManager:
"""
Manages hook security profiles.
Allows switching between different hook configurations
to adjust the level of security enforcement.
"""
def __init__(self, config_path: Optional[Path] = None):
"""
Initialize the hook profile manager.
Args:
config_path: Path to store profile config.
Defaults to ~/.hermes/hook-profile.yaml
"""
if config_path is None:
config_path = get_hermes_home() / "hook-profile.yaml"
self.config_path = config_path
self._active_profile_name: Optional[str] = "standard"
self._custom_profiles: Dict[str, HookProfile] = {}
self._load()
def _load(self) -> None:
"""Load profile configuration from disk."""
if self.config_path.exists():
try:
data = yaml.safe_load(self.config_path.read_text())
self._active_profile_name = data.get("active_profile", "standard")
# Load custom profiles
custom = data.get("custom_profiles", {})
for name, profile_data in custom.items():
self._custom_profiles[name] = HookProfile(
name=profile_data.get("name", name),
level=ProfileLevel(profile_data.get("level", "standard")),
description=profile_data.get("description", ""),
hooks=profile_data.get("hooks", []),
env_overrides=profile_data.get("env_overrides", {}),
auto_activate=profile_data.get("auto_activate", False),
)
logger.info(f"Loaded hook profile config: active={self._active_profile_name}")
except Exception as e:
logger.warning(f"Failed to load hook profile config: {e}")
self._active_profile_name = "standard"
def _save(self) -> None:
"""Save profile configuration to disk."""
try:
data = {
"active_profile": self._active_profile_name,
"custom_profiles": {},
}
for name, profile in self._custom_profiles.items():
data["custom_profiles"][name] = {
"name": profile.name,
"level": profile.level.value,
"description": profile.description,
"hooks": profile.hooks,
"env_overrides": profile.env_overrides,
"auto_activate": profile.auto_activate,
}
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.write_text(yaml.dump(data))
logger.debug(f"Saved hook profile config")
except Exception as e:
logger.error(f"Failed to save hook profile config: {e}")
def get_active_profile(self) -> HookProfile:
"""Get the currently active profile."""
if self._active_profile_name in BUILTIN_PROFILES:
return BUILTIN_PROFILES[self._active_profile_name]
if self._active_profile_name in self._custom_profiles:
return self._custom_profiles[self._active_profile_name]
# Fallback to standard
return BUILTIN_PROFILES["standard"]
def get_profile(self, name: str) -> Optional[HookProfile]:
"""Get a profile by name."""
if name in BUILTIN_PROFILES:
return BUILTIN_PROFILES[name]
return self._custom_profiles.get(name)
def list_profiles(self) -> List[str]:
"""List all available profile names."""
profiles = list(BUILTIN_PROFILES.keys())
profiles.extend(self._custom_profiles.keys())
return profiles
def switch_profile(self, name: str) -> bool:
"""
Switch to a different hook profile.
Args:
name: Profile name to switch to
Returns:
True if switch successful, False if profile doesn't exist
"""
if name not in BUILTIN_PROFILES and name not in self._custom_profiles:
logger.warning(f"Hook profile '{name}' not found")
return False
old_profile = self._active_profile_name
self._active_profile_name = name
self._save()
logger.info(f"Switched hook profile: {old_profile}{name}")
return True
def check_command(self, command: str, full_command: Optional[str] = None) -> Dict[str, Any]:
"""
Check a command against the active profile's hooks.
Args:
command: The command to check
full_command: Optional full command with args
Returns:
Dict with keys:
- allowed: bool
- blocked: bool
- warned: bool
- messages: List[str]
- hook_name: str or None
"""
if full_command is None:
full_command = command
profile = self.get_active_profile()
result = {
"allowed": True,
"blocked": False,
"warned": False,
"messages": [],
"hook_name": None,
}
for hook_name in profile.hooks:
hook_def = BUILTIN_HOOKS.get(hook_name)
if not hook_def or not hook_def.enabled:
continue
hook_result = self._run_hook(hook_def, command, full_command)
if hook_result is None:
continue
if hook_result["blocked"]:
result["blocked"] = True
result["allowed"] = False
result["hook_name"] = hook_name
result["messages"].extend(hook_result["messages"])
break
if hook_result["warned"]:
result["warned"] = True
result["messages"].extend(hook_result["messages"])
return result
def _run_hook(self, hook: HookDefinition, command: str, full_command: str) -> Optional[Dict[str, Any]]:
"""
Run a single hook and return the result.
Args:
hook: Hook definition
command: Short command (first word)
full_command: Full command string
Returns:
Dict with blocked/warned/messages, or None if hook not found
"""
if not hook.full_path.exists():
logger.warning(f"Hook script not found: {hook.full_path}")
return None
try:
proc = subprocess.run(
[str(hook.full_path), command, full_command],
capture_output=True,
text=True,
timeout=5,
)
output = proc.stdout.strip()
stderr = proc.stderr.strip()
if output:
messages = output.split("\n")
else:
messages = []
if stderr:
messages.append(f"STDERR: {stderr}")
return {
"blocked": proc.returncode != 0 and hook.blocks_on_match,
"warned": proc.returncode == 0 and bool(output),
"messages": messages,
}
except subprocess.TimeoutExpired:
logger.error(f"Hook timed out: {hook.name}")
return {"blocked": False, "warned": True, "messages": ["Hook timed out"]}
except Exception as e:
logger.error(f"Hook failed: {hook.name}: {e}")
return {"blocked": False, "warned": True, "messages": [f"Hook error: {e}"]}
def add_profile(self, profile: HookProfile) -> None:
"""
Add a custom profile.
Args:
profile: Profile to add
"""
self._custom_profiles[profile.name] = profile
self._save()
def remove_profile(self, name: str) -> bool:
"""
Remove a custom profile.
Args:
name: Profile name to remove
Returns:
True if removed, False if not found or is builtin
"""
if name in BUILTIN_PROFILES:
logger.warning(f"Cannot remove builtin profile: {name}")
return False
if name in self._custom_profiles:
del self._custom_profiles[name]
if self._active_profile_name == name:
self._active_profile_name = "standard"
self._save()
return True
return False
def auto_select_profile(self, context: str) -> HookProfile:
"""
Automatically select profile based on context.
Args:
context: Conversation context
Returns:
Selected profile
"""
context_lower = context.lower()
# Check for keywords that suggest different profiles
if any(kw in context_lower for kw in ["production", "prod", "deploy", "release"]):
self.switch_profile("strict")
return self.get_active_profile()
if any(kw in context_lower for kw in ["develop", "dev", "local", "test"]):
self.switch_profile("developer")
return self.get_active_profile()
if any(kw in context_lower for kw in ["rust", "cargo", "rs "]):
self.switch_profile("rust-dev")
return self.get_active_profile()
# Default to current profile
return self.get_active_profile()
def get_enabled_hooks(self) -> List[HookDefinition]:
"""Get list of enabled hooks for the active profile."""
profile = self.get_active_profile()
enabled = []
for hook_name in profile.hooks:
hook_def = BUILTIN_HOOKS.get(hook_name)
if hook_def:
enabled.append(hook_def)
return enabled
# =============================================================================
# Singleton
# =============================================================================
_hook_profile_manager: Optional[HookProfileManager] = None
def get_profile_manager() -> HookProfileManager:
"""Get the singleton HookProfileManager instance."""
global _hook_profile_manager
if _hook_profile_manager is None:
_hook_profile_manager = HookProfileManager()
return _hook_profile_manager
def check_command(command: str, full_command: Optional[str] = None) -> Dict[str, Any]:
"""
Check a command against the active profile's hooks.
Args:
command: The command to check
full_command: Optional full command with args
Returns:
Dict with allowed/blocked/warned/messages
"""
return get_profile_manager().check_command(command, full_command)
def switch_hook_profile(name: str) -> bool:
"""
Switch to a different hook profile.
Args:
name: Profile name
Returns:
True if successful
"""
return get_profile_manager().switch_profile(name)
def get_active_hook_profile() -> HookProfile:
"""Get the currently active hook profile."""
return get_profile_manager().get_active_profile()

508
agent/instinct.py Normal file
View File

@@ -0,0 +1,508 @@
"""
Instinct Learning System
Learns from successful interactions to improve tool/skill loading.
Tracks patterns of what tools and skills are used together,
and uses these patterns to predict future loading needs.
Inspired by ECC's experience tracking.
Usage:
from agent.instinct import InstinctLearner, get_instinct_learner
# Get the learner singleton
learner = get_instinct_learner()
# Record a successful interaction
learner.record_interaction(context="debug python code", tools=["terminal", "read_file"], skills=["systematic-debugging"])
# Get predicted skills for context
predictions = learner.predict_for_context("my python is broken")
"""
import json
import logging
import time
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Any
from dataclasses import dataclass, field, asdict
from collections import defaultdict
from functools import lru_cache
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# =============================================================================
# Data Structures
# =============================================================================
@dataclass
class InteractionRecord:
"""Record of a successful interaction."""
timestamp: float
context: str
context_hash: str # For quick lookup
tools: List[str]
skills: List[str]
success: bool = True
feedback: Optional[str] = None # Optional user feedback
@classmethod
def create(cls, context: str, tools: List[str], skills: List[str],
feedback: Optional[str] = None) -> "InteractionRecord":
import hashlib
context_hash = hashlib.md5(context.lower().encode()).hexdigest()[:12]
return cls(
timestamp=time.time(),
context=context,
context_hash=context_hash,
tools=sorted(tools),
skills=sorted(skills),
feedback=feedback,
)
@dataclass
class PatternStats:
"""Statistics for a context pattern."""
count: int = 0
total_success: int = 0
avg_tools: float = 0.0
avg_skills: float = 0.0
last_used: float = 0.0
# Tool/skill co-occurrence counts
tool_counts: Dict[str, int] = field(default_factory=dict)
skill_counts: Dict[str, int] = field(default_factory=dict)
@property
def success_rate(self) -> float:
if self.count == 0:
return 0.0
return self.total_success / self.count
def to_dict(self) -> Dict:
return {
"count": self.count,
"total_success": self.total_success,
"avg_tools": self.avg_tools,
"avg_skills": self.avg_skills,
"last_used": self.last_used,
"tool_counts": self.tool_counts,
"skill_counts": self.skill_counts,
}
@classmethod
def from_dict(cls, data: Dict) -> "PatternStats":
return cls(
count=data.get("count", 0),
total_success=data.get("total_success", 0),
avg_tools=data.get("avg_tools", 0.0),
avg_skills=data.get("avg_skills", 0.0),
last_used=data.get("last_used", 0.0),
tool_counts=data.get("tool_counts", {}),
skill_counts=data.get("skill_counts", {}),
)
@dataclass
class InstinctProfile:
"""
Learned instincts for a context pattern.
Created by aggregating InteractionRecords.
"""
pattern: str # The context pattern/key
tools: List[str] # Most commonly used tools
skills: List[str] # Most commonly used skills
confidence: float # 0-1 based on sample size
sample_size: int
last_used: float
def to_dict(self) -> Dict:
return {
"pattern": self.pattern,
"tools": self.tools,
"skills": self.skills,
"confidence": self.confidence,
"sample_size": self.sample_size,
"last_used": self.last_used,
}
# =============================================================================
# Instinct Learner
# =============================================================================
class InstinctLearner:
"""
Learns from interactions to predict tool/skill loading.
Tracks patterns and builds instincts that can be used
to preload tools and skills before they're explicitly needed.
"""
def __init__(self, storage_path: Optional[Path] = None):
"""
Initialize the instinct learner.
Args:
storage_path: Path to store interaction data.
Defaults to ~/.hermes/instinct/
"""
if storage_path is None:
storage_path = get_hermes_home() / "instinct"
self.storage_path = storage_path
self.storage_path.mkdir(parents=True, exist_ok=True)
self.interactions_file = storage_path / "interactions.jsonl"
self.patterns_file = storage_path / "patterns.json"
self.instincts_file = storage_path / "instincts.json"
# In-memory cache
self._patterns: Dict[str, PatternStats] = {}
self._instincts: Dict[str, InstinctProfile] = {}
self._context_hints: Dict[str, List[str]] = {} # hint -> [pattern]
# Load existing data
self._load()
def _load(self) -> None:
"""Load patterns and instincts from disk."""
# Load patterns
if self.patterns_file.exists():
try:
data = json.loads(self.patterns_file.read_text())
self._patterns = {
k: PatternStats.from_dict(v) for k, v in data.items()
}
logger.info(f"Loaded {len(self._patterns)} patterns")
except Exception as e:
logger.warning(f"Failed to load patterns: {e}")
# Load instincts
if self.instincts_file.exists():
try:
data = json.loads(self.instincts_file.read_text())
self._instincts = {
k: InstinctProfile(**v) for k, v in data.items()
}
logger.info(f"Loaded {len(self._instincts)} instincts")
except Exception as e:
logger.warning(f"Failed to load instincts: {e}")
def _save(self) -> None:
"""Save patterns and instincts to disk."""
try:
# Save patterns
patterns_data = {k: v.to_dict() for k, v in self._patterns.items()}
self.patterns_file.write_text(json.dumps(patterns_data, indent=2))
# Save instincts
instincts_data = {k: v.to_dict() for k, v in self._instincts.items()}
self.instincts_file.write_text(json.dumps(instincts_data, indent=2))
logger.debug(f"Saved {len(self._patterns)} patterns, {len(self._instincts)} instincts")
except Exception as e:
logger.error(f"Failed to save instinct data: {e}")
def record_interaction(
self,
context: str,
tools: List[str],
skills: List[str],
success: bool = True,
feedback: Optional[str] = None,
) -> None:
"""
Record a successful interaction.
Args:
context: The conversation context
tools: Tools that were used
skills: Skills that were loaded
success: Whether the interaction was successful
feedback: Optional user feedback
"""
import re
# Create interaction record
record = InteractionRecord.create(context, tools, skills, feedback)
# Append to interactions log
with open(self.interactions_file, "a") as f:
f.write(json.dumps(asdict(record)) + "\n")
# Extract context hints (keywords)
hints = self._extract_hints(context)
# Update patterns for each hint
for hint in hints:
if hint not in self._patterns:
self._patterns[hint] = PatternStats()
stats = self._patterns[hint]
stats.count += 1
if success:
stats.total_success += 1
# Update running averages
stats.avg_tools = (stats.avg_tools * (stats.count - 1) + len(tools)) / stats.count
stats.avg_skills = (stats.avg_skills * (stats.count - 1) + len(skills)) / stats.count
stats.last_used = record.timestamp
# Update co-occurrence counts
for tool in tools:
stats.tool_counts[tool] = stats.tool_counts.get(tool, 0) + 1
for skill in skills:
stats.skill_counts[skill] = stats.skill_counts.get(skill, 0) + 1
# Update hint index
for hint in hints:
if hint not in self._context_hints:
self._context_hints[hint] = []
if record.context_hash not in self._context_hints[hint]:
self._context_hints[hint].append(record.context_hash)
# Rebuild instincts for affected patterns
self._rebuild_instincts(hints)
# Save periodically (every 10 interactions)
if sum(p.count for p in self._patterns.values()) % 10 == 0:
self._save()
def _extract_hints(self, context: str) -> List[str]:
"""Extract meaningful hints from context."""
import re
context_lower = context.lower()
# Extract words (3+ chars)
words = re.findall(r'\b\w{3,}\b', context_lower)
# Filter out common stop words
stop_words = {
'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can',
'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him',
'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two',
'who', 'boy', 'did', 'she', 'use', 'way', 'will', 'with', 'have',
'this', 'that', 'from', 'they', 'what', 'were', 'been', 'when',
'would', 'there', 'could', 'their', 'which', 'about', 'after',
'before', 'between', 'into', 'over', 'under', 'again', 'more',
'some', 'such', 'only', 'just', 'also', 'very', 'than', 'then',
'them', 'these', 'those', 'here', 'where', 'why', 'how', 'your',
}
hints = [w for w in words if w not in stop_words]
# Add bigrams for common patterns
bigrams = []
for i in range(len(words) - 1):
bigram = f"{words[i]}_{words[i+1]}"
bigrams.append(bigram)
hints.extend(bigrams)
return hints[:20] # Limit to top 20 hints
def _rebuild_instincts(self, hints: List[str]) -> None:
"""
Rebuild instincts for the given hints.
An instinct is a prediction for what tools/skills to load
based on learned patterns.
"""
for hint in hints:
stats = self._patterns.get(hint)
if not stats or stats.count < 2:
continue
# Calculate confidence based on sample size
confidence = min(1.0, stats.count / 10) # Max confidence at 10 samples
# Get top tools (appearing in >50% of interactions)
threshold = stats.count * 0.5
top_tools = [t for t, c in stats.tool_counts.items() if c >= threshold]
# Get top skills
top_skills = [s for s, c in stats.skill_counts.items() if c >= threshold]
if top_tools or top_skills:
self._instincts[hint] = InstinctProfile(
pattern=hint,
tools=top_tools[:5], # Max 5 tools
skills=top_skills[:5], # Max 5 skills
confidence=confidence,
sample_size=stats.count,
last_used=stats.last_used,
)
def predict_for_context(self, context: str, limit: int = 10) -> List[str]:
"""
Predict what to load for a context.
Args:
context: The conversation context
limit: Maximum number of predictions
Returns:
List of tool/skill paths to preload
"""
hints = self._extract_hints(context)
# Aggregate predictions from matching hints
tool_scores: Dict[str, float] = defaultdict(float)
skill_scores: Dict[str, float] = defaultdict(float)
for hint in hints:
instinct = self._instincts.get(hint)
if not instinct:
continue
# Weight by confidence
weight = instinct.confidence
for tool in instinct.tools:
tool_scores[tool] += weight
for skill in instinct.skills:
skill_scores[skill] += weight
# Combine and sort
predictions = []
for tool, score in sorted(tool_scores.items(), key=lambda x: -x[1])[:limit]:
predictions.append(tool)
for skill, score in sorted(skill_scores.items(), key=lambda x: -x[1])[:limit]:
if skill not in predictions:
predictions.append(skill)
return predictions[:limit]
def get_instinct_for_hint(self, hint: str) -> Optional[InstinctProfile]:
"""Get the instinct for a specific hint."""
return self._instincts.get(hint)
def get_stats(self) -> Dict[str, Any]:
"""Get learning statistics."""
total_interactions = sum(p.count for p in self._patterns.values())
return {
"total_patterns": len(self._patterns),
"total_instincts": len(self._instincts),
"total_interactions": total_interactions,
"avg_tools_per_interaction": sum(p.avg_tools for p in self._patterns.values()) / max(1, len(self._patterns)),
"avg_skills_per_interaction": sum(p.avg_skills for p in self._patterns.values()) / max(1, len(self._patterns)),
}
def clear_old_data(self, days: int = 30) -> int:
"""
Clear pattern data older than specified days.
Args:
days: Number of days to retain
Returns:
Number of patterns removed
"""
import time
cutoff = time.time() - (days * 86400)
removed = 0
for pattern in list(self._patterns.keys()):
if self._patterns[pattern].last_used < cutoff:
del self._patterns[pattern]
if pattern in self._instincts:
del self._instincts[pattern]
removed += 1
if removed > 0:
self._save()
return removed
def export_instincts(self) -> Dict[str, Any]:
"""Export all instincts for sharing/backup."""
return {
"version": "1.0.0",
"exported_at": time.time(),
"instincts": {k: v.to_dict() for k, v in self._instincts.items()},
"patterns": {k: v.to_dict() for k, v in self._patterns.items()},
}
def import_instincts(self, data: Dict) -> int:
"""
Import instincts from backup.
Args:
data: Exported instinct data
Returns:
Number of instincts imported
"""
count = 0
for k, v in data.get("instincts", {}).items():
try:
self._instincts[k] = InstinctProfile(**v)
count += 1
except Exception as e:
logger.warning(f"Failed to import instinct {k}: {e}")
for k, v in data.get("patterns", {}).items():
try:
self._patterns[k] = PatternStats.from_dict(v)
except Exception as e:
logger.warning(f"Failed to import pattern {k}: {e}")
if count > 0:
self._save()
return count
# =============================================================================
# Singleton
# =============================================================================
_instinct_learner: Optional[InstinctLearner] = None
def get_instinct_learner() -> InstinctLearner:
"""Get the singleton InstinctLearner instance."""
global _instinct_learner
if _instinct_learner is None:
_instinct_learner = InstinctLearner()
return _instinct_learner
def record_success(
context: str,
tools: List[str],
skills: List[str],
feedback: Optional[str] = None,
) -> None:
"""
Convenience function to record a successful interaction.
Args:
context: The conversation context
tools: Tools that were used
skills: Skills that were loaded
feedback: Optional user feedback
"""
get_instinct_learner().record_interaction(context, tools, skills, success=True, feedback=feedback)
def predict_loading(context: str, limit: int = 10) -> List[str]:
"""
Predict what to load for a context.
Args:
context: The conversation context
limit: Maximum number of predictions
Returns:
List of tool/skill paths to preload
"""
return get_instinct_learner().predict_for_context(context, limit)

648
agent/lazy_loader.py Normal file
View File

@@ -0,0 +1,648 @@
"""
Context-Aware Lazy Tool Loader
Implements tiered tool loading inspired by ECC's agent compression.
Only loads tools when needed based on conversation context.
Architecture:
- TIER_0 (Tools): Native functions from registry (terminal, file ops, etc.)
- TIER_1 (Skills): High-level skills loaded on context triggers
- TIER_2 (Rare Tools): Tools rarely needed, loaded on demand
Usage:
from agent.lazy_loader import LazyToolLoader, get_lazy_tool_loader
loader = get_lazy_tool_loader()
tools = loader.get_tools_for_context("I need to debug my Python code")
# Or use the helper function
from agent.lazy_loader import get_tools_budget_aware
tools = get_tools_budget_aware(context, context_usage=0.5)
"""
import re
import time
import logging
from typing import Dict, List, Optional, Set, Any, Callable, Tuple
from functools import lru_cache
from collections import defaultdict
logger = logging.getLogger(__name__)
# =============================================================================
# Tool Tiers (Native tools from registry)
# =============================================================================
# Core tools that are always loaded (Tier 0)
TIER_0_TOOLS = frozenset({
"terminal",
"read_file",
"write_file",
"patch",
"search_files",
"skills_list",
"skill_view",
"skill_manage",
"todo",
"memory",
"session_search",
"clarify",
})
# Tools that are pre-loaded based on context (Tier 1)
# These are actual TOOL NAMES from the registry
TIER_1_BY_CONTEXT = {
"code:python": ["terminal", "read_file"], # Basic file ops for Python
"code:go": ["terminal", "read_file"],
"code:rust": ["terminal", "read_file"],
"code:js": ["terminal", "read_file"],
"debug": ["terminal", "read_file", "patch"],
"test": ["terminal"],
"ship": ["terminal"],
"deploy": ["terminal"],
"plan": ["skills_list", "skill_view"],
"brainstorm": ["skills_list", "skill_view"],
"web": ["web_search", "web_extract", "browser_navigate"],
"browser": ["browser_navigate", "browser_snapshot", "browser_click", "browser_vision"],
"search": ["web_search", "search_files"],
"ml": ["terminal"],
"ai": ["terminal"],
"database": ["terminal"],
"docker": ["terminal"],
"git": ["terminal"],
}
# Tools that are always Tier 2 (loaded on demand only)
TIER_2_TOOLS = frozenset({
"cronjob",
"text_to_speech",
"delegate_task",
"mixture_of_agents",
"execute_code",
"image_generate",
"vision_analyze",
"process",
"send_message",
"rl_start_training",
"rl_stop_training",
"rl_list_runs",
"rl_list_environments",
})
# =============================================================================
# Skill Tiers (Context-triggered skills)
# =============================================================================
# Skills to load on context triggers (these are NOT tools, they're skill paths)
CONTEXT_TO_SKILLS = {
"code:python": [
"software-development/python-reviewer",
"software-development/build-error-resolver",
],
"code:go": [
"software-development/go-reviewer",
],
"code:rust": [
"software-development/rust-reviewer",
],
"code:js": [
"software-development/javascript-reviewer",
],
"code:java": [
"software-development/java-reviewer",
],
"code:cpp": [
"software-development/cpp-reviewer",
],
"code:csharp": [
"software-development/csharp-reviewer",
],
"code:php": [
"software-development/php-reviewer",
],
"debug": [
"software-development/systematic-debugging",
"software-development/build-error-resolver",
],
"review": [
"software-development/two-stage-review",
"software-development/python-reviewer",
],
"test": [
"software-development/test-driven-development",
],
"ship": [
"software-development/idempotent-ship",
],
"deploy": [
"software-development/idempotent-ship",
],
"plan": [
"software-development/writing-plans",
"software-development/brainstorming",
],
"feature": [
"software-development/writing-plans",
"software-development/brainstorming",
],
"brainstorm": [
"software-development/brainstorming",
],
"install": [
"software-development/hooks",
],
"hook": [
"software-development/hooks",
],
"ml": [
"research/llm-wiki",
"mlops/huggingface-hub",
],
"ai": [
"research/llm-wiki",
"mlops/huggingface-hub",
],
"model": [
"research/llm-wiki",
"mlops/huggingface-hub",
],
"refactor": [
"software-development/python-reviewer",
"software-development/two-stage-review",
],
"performance": [
"software-development/python-reviewer",
"software-development/systematic-debugging",
],
"security": [
"software-development/two-stage-review",
],
}
# =============================================================================
# Code Extensions & Keywords
# =============================================================================
CODE_EXTENSIONS = {
"py": "python",
"js": "javascript",
"ts": "typescript",
"go": "golang",
"rs": "rust",
"java": "java",
"cpp": "cpp",
"c": "c",
"rb": "ruby",
"php": "php",
"sh": "shell",
"bash": "shell",
"zsh": "shell",
"yaml": "yaml",
"yml": "yaml",
"json": "json",
"toml": "toml",
"md": "markdown",
"sql": "sql",
"html": "html",
"css": "css",
"cs": "csharp",
}
DOMAIN_KEYWORDS = {
"review", "debug", "test", "ship", "deploy", "build", "compile",
"install", "setup", "config", "api", "web", "browser", "search",
"database", "docker", "kubernetes", "k8s", "cloud", "aws", "gcp",
"azure", "linux", "windows", "macos", "android", "ios",
"python", "javascript", "typescript", "go", "rust", "java",
"ml", "ai", "machine learning", "deep learning", "nlp",
"git", "github", "gitlab", "ci", "cd", "pipeline",
"refactor", "performance", "security",
"write", "create", "implement", "feature", "feature request",
}
# =============================================================================
# LazyToolLoader
# =============================================================================
class LazyToolLoader:
"""
Context-aware tool loader with tiering and caching.
TIER_0: Always loaded (core tools from registry)
TIER_1: Context-preloaded (tools based on detected patterns)
TIER_2: On-demand loaded (rare tools)
Plus: Skill preloading based on context
"""
def __init__(self):
self._tier_0_cache: Optional[List[dict]] = None
self._tier_1_cache: Dict[str, List[dict]] = {}
self._tier_2_cache: Dict[str, dict] = {}
self._context_hints: Set[str] = set()
self._active_skills: Set[str] = set()
self._skill_content_cache: Dict[str, str] = {}
self._last_context_analysis: float = 0
self._cache_ttl: float = 60.0
self._tools_discovered: bool = False
def get_tools_for_context(
self,
context: str,
enabled_toolsets: Optional[List[str]] = None,
context_usage: float = 0.0,
) -> List[dict]:
"""
Get tool definitions optimized for the current context.
Args:
context: Conversation context (recent messages)
enabled_toolsets: Available toolsets to filter from
context_usage: Current context window usage (0.0-1.0)
Returns:
List of OpenAI-format tool definitions
"""
self._ensure_tools_discovered()
# Check if we need to re-analyze context
current_time = time.time()
if current_time - self._last_context_analysis > self._cache_ttl:
self._context_hints = self._analyze_context(context)
self._last_context_analysis = current_time
tools = []
seen = set()
# Tier 0: Always load core tools
if self._tier_0_cache is None:
self._tier_0_cache = self._load_tier_0_tools()
tools.extend(self._tier_0_cache)
for t in self._tier_0_cache:
seen.add(t["function"]["name"])
# Budget-aware Tier 1 loading
if context_usage < 0.8:
tier_1_tools = self._get_tier_1_tools()
for t in tier_1_tools:
if t["function"]["name"] not in seen:
tools.append(t)
seen.add(t["function"]["name"])
# Budget-aware Tier 2 exclusion
# (Tier 2 tools are never preloaded, only loaded on-demand)
return tools
def get_active_skills(self, context: str, context_usage: float = 0.0) -> List[str]:
"""
Get list of skill paths to load for the current context.
Args:
context: Conversation context
context_usage: Current context usage (0.0-1.0)
Returns:
List of skill paths to load
"""
current_time = time.time()
if current_time - self._last_context_analysis > self._cache_ttl:
self._context_hints = self._analyze_context(context)
self._last_context_analysis = current_time
skills = []
seen = set()
# Load skills based on context hints
for hint in self._context_hints:
if hint in CONTEXT_TO_SKILLS:
for skill_path in CONTEXT_TO_SKILLS[hint]:
if skill_path not in seen:
# Only load if budget allows
if context_usage < 0.85:
skills.append(skill_path)
seen.add(skill_path)
self._active_skills = seen
return skills
def get_skill_content(self, skill_path: str) -> str:
"""
Get skill content with caching.
Args:
skill_path: Path relative to skills dir (e.g., 'software-development/python-reviewer')
Returns:
Skill content or empty string if not found
"""
if skill_path not in self._skill_content_cache:
try:
from pathlib import Path
from hermes_constants import get_skills_dir
skill_file = get_skills_dir() / skill_path / "SKILL.md"
if skill_file.exists():
self._skill_content_cache[skill_path] = skill_file.read_text()
else:
self._skill_content_cache[skill_path] = ""
except Exception as e:
logger.debug(f"Failed to load skill {skill_path}: {e}")
self._skill_content_cache[skill_path] = ""
return self._skill_content_cache[skill_path]
def preload_skills_for_context(
self,
context: str,
context_usage: float = 0.0,
max_tokens: int = 8000
) -> str:
"""
Preload skill content for the current context.
Returns combined skill content as a string for injection into context.
Args:
context: Conversation context
context_usage: Current context usage
max_tokens: Maximum tokens to use for skill content
Returns:
Combined skill content string
"""
skills = self.get_active_skills(context, context_usage)
combined = []
total_chars = 0
for skill_path in skills:
content = self.get_skill_content(skill_path)
if content:
# Rough token estimate: 4 chars per token
estimated_tokens = len(content) // 4
if total_chars + estimated_tokens < max_tokens * 4:
combined.append(f"\n\n=== Skill: {skill_path} ===\n{content}")
total_chars += estimated_tokens
return "\n".join(combined)
def get_tools_from_existing(
self,
all_tools: List[dict],
context: str,
context_usage: float = 0.0,
) -> List[dict]:
"""
Filter an existing tool list based on context.
Args:
all_tools: Full list of tool definitions
context: Conversation context
context_usage: 0.0-1.0 context window usage
Returns:
Filtered tool list optimized for context
"""
if context_usage > 0.9:
# Emergency mode - only tier 0
tier_0_names = TIER_0_TOOLS
return [t for t in all_tools if t["function"]["name"] in tier_0_names]
if context_usage > 0.8:
# High usage - tier 0 + critical tier 1
tier_0_names = TIER_0_TOOLS
critical_tools = {"terminal", "read_file", "patch", "search_files"}
allowed = tier_0_names | critical_tools
return [t for t in all_tools if t["function"]["name"] in allowed]
# Normal mode - use context analysis
hints = self._analyze_context(context)
allowed_tools = set(TIER_0_TOOLS)
for hint in hints:
if hint in TIER_1_BY_CONTEXT:
allowed_tools.update(TIER_1_BY_CONTEXT[hint])
# Filter and rank tools
result = []
for tool in all_tools:
name = tool["function"]["name"]
if name in allowed_tools:
result.append(tool)
return result
def load_tier_2_tool(self, tool_name: str) -> Optional[dict]:
"""
On-demand load a Tier 2 tool.
"""
if tool_name not in self._tier_2_cache:
self._ensure_tools_discovered()
self._tier_2_cache[tool_name] = self._load_single_tool(tool_name)
return self._tier_2_cache.get(tool_name)
def preload_for_tool(self, tool_name: str) -> None:
"""
Preload a tool's schema into cache.
"""
if tool_name in TIER_2_TOOLS:
self.load_tier_2_tool(tool_name)
def _ensure_tools_discovered(self) -> None:
"""Ensure all tools have been discovered and registered."""
if not self._tools_discovered:
try:
import model_tools
self._tools_discovered = True
except Exception:
pass
def _analyze_context(self, context: str) -> Set[str]:
"""Analyze context text and extract hints."""
hints = set()
context_lower = context.lower()
# Extract code extensions
extensions = re.findall(r'\.(\w+)', context_lower)
for ext in extensions:
if ext in CODE_EXTENSIONS:
hints.add(f"code:{CODE_EXTENSIONS[ext]}")
# Check domain keywords
for keyword in DOMAIN_KEYWORDS:
if keyword in context_lower:
hints.add(keyword)
# Check for specific phrases
tool_mentions = {
"web search": "web",
"search the web": "web",
"golang": "code:go",
"go language": "code:go",
"go code": "code:go",
" in go ": "code:go",
"write go ": "code:go",
"python code": "code:python",
"python script": "code:python",
" in python": "code:python",
"write python": "code:python",
"rust code": "code:rust",
"rust program": "code:rust",
" in rust": "code:rust",
"write rust": "code:rust",
"javascript": "code:js",
"typescript": "code:js",
"c++": "code:cpp",
"cron": "database", # Schedule jobs via terminal
"schedule": "database",
"voice": "ai",
"speak": "ai",
"delegate": "ai",
"subagent": "ai",
}
for phrase, hint in tool_mentions.items():
if phrase in context_lower:
hints.add(hint)
return hints
def _get_tier_1_tools(self) -> List[dict]:
"""Get Tier 1 tools based on context analysis."""
tools = []
seen = set()
for hint in self._context_hints:
if hint in TIER_1_BY_CONTEXT:
for tool_name in TIER_1_BY_CONTEXT[hint]:
if tool_name not in seen:
seen.add(tool_name)
tool_def = self._load_single_tool(tool_name)
if tool_def:
tools.append(tool_def)
return tools
def _load_tier_0_tools(self) -> List[dict]:
"""Load Tier 0 core tools."""
tools = []
for tool_name in TIER_0_TOOLS:
tool_def = self._load_single_tool(tool_name)
if tool_def:
tools.append(tool_def)
return tools
def _load_single_tool(self, tool_name: str) -> Optional[dict]:
"""Load a single tool's schema from registry."""
try:
from tools.registry import registry
entry = registry._tools.get(tool_name)
if not entry:
return None
# Check if tool is available
if entry.check_fn:
try:
if not entry.check_fn():
return None
except Exception:
return None
schema = entry.schema.copy()
schema["name"] = entry.name
return {"type": "function", "function": schema}
except Exception as e:
logger.debug(f"Failed to load tool {tool_name}: {e}")
return None
# =============================================================================
# Integration helpers
# =============================================================================
def filter_tools_with_context(
all_tools: List[dict],
context: str,
context_usage: float = 0.0,
) -> List[dict]:
"""
Filter a pre-loaded tool list based on context.
Usage:
from model_tools import get_tool_definitions
from agent.lazy_loader import filter_tools_with_context
all_tools = get_tool_definitions(enabled_toolsets=["terminal"])
filtered_tools = filter_tools_with_context(
all_tools,
context=conversation_context,
context_usage=0.5
)
"""
loader = get_lazy_tool_loader()
return loader.get_tools_from_existing(all_tools, context, context_usage)
def get_skills_for_context(
context: str,
context_usage: float = 0.0,
) -> List[str]:
"""
Get skill paths to load for the current context.
Usage:
from agent.lazy_loader import get_skills_for_context
skill_paths = get_skills_for_context("I need to review Python code")
for path in skill_paths:
content = loader.get_skill_content(path)
# Inject into context...
"""
loader = get_lazy_tool_loader()
return loader.get_active_skills(context, context_usage)
# =============================================================================
# Singleton instance
# =============================================================================
_lazy_tool_loader: Optional[LazyToolLoader] = None
def get_lazy_tool_loader() -> LazyToolLoader:
"""Get the singleton LazyToolLoader instance."""
global _lazy_tool_loader
if _lazy_tool_loader is None:
_lazy_tool_loader = LazyToolLoader()
return _lazy_tool_loader
# =============================================================================
# Helper functions
# =============================================================================
def get_tools_budget_aware(
context: str,
context_usage: float,
enabled_toolsets: Optional[List[str]] = None,
) -> List[dict]:
"""
Convenience function for budget-aware tool loading.
"""
loader = get_lazy_tool_loader()
return loader.get_tools_for_context(
context=context,
enabled_toolsets=enabled_toolsets,
context_usage=context_usage,
)
def preload_tools_for_hint(hint: str) -> None:
"""Preload tools based on a context hint."""
loader = get_lazy_tool_loader()
loader._context_hints.add(hint)

548
agent/manifest.py Normal file
View File

@@ -0,0 +1,548 @@
"""
Manifest Profile System
Defines named agent profiles with specific skill sets.
Users or context can select which profile to use,
allowing selective skill loading instead of loading everything.
Inspired by ECC's agent manifest system.
Usage:
from agent.manifest import ManifestLoader, get_active_profile, switch_profile
# Get current profile
profile = get_active_profile()
# Switch to a different profile
switch_profile("coding")
# Load skills for current profile
loader = ManifestLoader()
skills = loader.get_skills_for_profile("debugging")
"""
import yaml
import logging
from pathlib import Path
from typing import Dict, List, Optional, Set, Any, Callable
from dataclasses import dataclass, field
from functools import lru_cache
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# =============================================================================
# Data Structures
# =============================================================================
@dataclass
class SkillRef:
"""Reference to a skill with optional parameters."""
path: str
enabled: bool = True
priority: int = 0 # Lower = higher priority
description: Optional[str] = None
@dataclass
class Profile:
"""An agent profile with associated skills."""
name: str
description: str
skills: List[SkillRef]
tags: List[str] = field(default_factory=list)
auto_activate: bool = False # Auto-activate based on context hints
@property
def skill_paths(self) -> List[str]:
"""Get list of skill paths for this profile."""
return [s.path for s in self.skills if s.enabled]
@dataclass
class Manifest:
"""Complete manifest with all profiles."""
version: str
profiles: Dict[str, Profile]
default_profile: str
metadata: Dict[str, Any] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Dict) -> "Manifest":
"""Create Manifest from dictionary."""
profiles = {}
for name, profile_data in data.get("profiles", {}).items():
skills = []
for skill_data in profile_data.get("skills", []):
if isinstance(skill_data, str):
# Simple skill reference
skills.append(SkillRef(path=skill_data))
elif isinstance(skill_data, dict):
# Skill with extra config
skills.append(SkillRef(
path=skill_data.get("path", skill_data.get("name", "")),
enabled=skill_data.get("enabled", True),
priority=skill_data.get("priority", 0),
description=skill_data.get("description"),
))
profiles[name] = Profile(
name=profile_data.get("name", name),
description=profile_data.get("description", ""),
skills=skills,
tags=profile_data.get("tags", []),
auto_activate=profile_data.get("auto_activate", False),
)
return cls(
version=data.get("version", "1.0.0"),
profiles=profiles,
default_profile=data.get("default_profile", "general"),
metadata=data.get("metadata", {}),
)
# =============================================================================
# Built-in Default Manifest
# =============================================================================
DEFAULT_MANIFEST_YAML = """
# Hermes Agent Manifest
# Defines agent profiles with specific skill sets
# Profiles allow selective skill loading based on context
version: "1.0.0"
default_profile: coding
profiles:
coding:
name: "Coding Agent"
description: "For code review, refactoring, and implementation"
tags: [code, review, refactor]
auto_activate: true
skills:
- path: software-development/python-reviewer
priority: 10
- path: software-development/go-reviewer
priority: 20
- path: software-development/rust-reviewer
priority: 20
- path: software-development/javascript-reviewer
priority: 30
- path: software-development/two-stage-review
priority: 5
- path: software-development/build-error-resolver
priority: 15
debugging:
name: "Debug Agent"
description: "For systematic debugging and problem solving"
tags: [debug, problem-solving]
auto_activate: true
skills:
- path: software-development/systematic-debugging
priority: 5
- path: software-development/build-error-resolver
priority: 10
shipping:
name: "Ship Agent"
description: "For deployment, CI/CD, and releasing"
tags: [deploy, ship, ci-cd]
auto_activate: true
skills:
- path: software-development/idempotent-ship
priority: 5
- path: software-development/hooks
priority: 10
planning:
name: "Planning Agent"
description: "For brainstorming and planning"
tags: [plan, brainstorm]
auto_activate: true
skills:
- path: software-development/brainstorming
priority: 5
- path: software-development/writing-plans
priority: 10
research:
name: "Research Agent"
description: "For ML/AI research and exploration"
tags: [research, ml, ai]
auto_activate: true
skills:
- path: research/llm-wiki
priority: 5
- path: mlops/huggingface-hub
priority: 10
- path: mlops/weights-and-biases
priority: 20
general:
name: "General Agent"
description: "General purpose assistant"
tags: [general]
skills: []
"""
# =============================================================================
# Manifest Loader
# =============================================================================
class ManifestLoader:
"""
Loads and manages agent manifest profiles.
The manifest defines named profiles with specific skill sets.
Skills are loaded selectively based on the active profile.
"""
def __init__(self, manifest_path: Optional[Path] = None):
"""
Initialize manifest loader.
Args:
manifest_path: Path to manifest YAML file.
Defaults to ~/.hermes/manifest.yaml
"""
if manifest_path is None:
manifest_path = get_hermes_home() / "manifest.yaml"
self.manifest_path = manifest_path
self._manifest: Optional[Manifest] = None
self._active_profile_name: Optional[str] = None
self._profile_hints: Set[str] = set()
def load_manifest(self) -> Manifest:
"""Load manifest from file or use default."""
if self._manifest is not None:
return self._manifest
# Try to load from file
if self.manifest_path.exists():
try:
data = yaml.safe_load(self.manifest_path.read_text())
self._manifest = Manifest.from_dict(data)
logger.info(f"Loaded manifest from {self.manifest_path}")
return self._manifest
except Exception as e:
logger.warning(f"Failed to load manifest: {e}")
# Use default manifest
self._manifest = Manifest.from_dict(yaml.safe_load(DEFAULT_MANIFEST_YAML))
logger.info("Using default manifest")
return self._manifest
def get_profile(self, name: str) -> Optional[Profile]:
"""Get a profile by name."""
manifest = self.load_manifest()
return manifest.profiles.get(name)
def get_active_profile(self) -> Profile:
"""Get the currently active profile."""
manifest = self.load_manifest()
if self._active_profile_name and self._active_profile_name in manifest.profiles:
return manifest.profiles[self._active_profile_name]
# Return default profile
return manifest.profiles[manifest.default_profile]
def switch_profile(self, name: str) -> bool:
"""
Switch to a different profile.
Args:
name: Profile name to switch to
Returns:
True if switch successful, False if profile doesn't exist
"""
manifest = self.load_manifest()
if name not in manifest.profiles:
logger.warning(f"Profile '{name}' not found")
return False
old_profile = self._active_profile_name
self._active_profile_name = name
logger.info(f"Switched profile: {old_profile}{name}")
return True
def auto_select_profile(self, context: str) -> Profile:
"""
Automatically select profile based on context.
Uses tag matching and hint detection to find the best profile.
Uses word boundary matching to avoid false positives.
Profile-specific tags (defined in profile) get higher weight.
Args:
context: Conversation context
Returns:
Selected profile
"""
import re
manifest = self.load_manifest()
context_lower = context.lower()
best_profile = None
best_score = 0
best_match_count = 0
# Pre-process context into words
context_words = set(re.findall(r'\b\w+\b', context_lower))
for name, profile in manifest.profiles.items():
if not profile.auto_activate:
continue
score = 0
match_count = 0
# Check tags against context (word boundary matching)
# Primary tag (first in list) and early-position matches get higher weight
for i, tag in enumerate(profile.tags):
if not tag:
continue
# Check if tag appears in context
if tag in context_words:
# Primary tag (first) gets higher base weight
base_weight = 20 if i == 0 else 8
# Find position of tag in original context for position scoring
tag_pos = context_lower.find(tag)
if tag_pos >= 0:
# Tags appearing in first 30% of context get bonus (main intent)
if tag_pos < len(context_lower) * 0.3:
score += base_weight + 5
else:
score += base_weight
else:
score += base_weight
match_count += 1
# Fall back to substring match (lower priority)
elif tag in context_lower:
base_weight = 8 if i == 0 else 3
score += base_weight
match_count += 1
# Profile name match is a strong signal (user explicitly mentioned this agent type)
profile_name_words = set(re.findall(r'\b\w+\b', profile.name.lower()))
for word in profile_name_words:
if word in context_words and len(word) > 3: # Skip short words
score += 20 # Strong signal
match_count += 2
# Check if profile name/description matches
if profile.name.lower() in context_lower:
score += 3
if profile.description.lower() in context_lower:
score += 1
# Update best if this profile is better
# Tie-breaker: prefer profile with more tag matches
if score > best_score or (score == best_score and match_count > best_match_count):
best_score = score
best_match_count = match_count
best_profile = profile
if best_profile and best_score > 0:
self._active_profile_name = best_profile.name
logger.info(f"Auto-selected profile: {best_profile.name} (score={best_score})")
return best_profile
# Return default
return manifest.profiles[manifest.default_profile]
def get_skills_for_profile(self, profile_name: str) -> List[str]:
"""
Get skill paths for a profile.
Args:
profile_name: Name of the profile
Returns:
List of skill paths
"""
profile = self.get_profile(profile_name)
if profile is None:
return []
return profile.skill_paths
def get_skills_for_active_profile(self) -> List[str]:
"""Get skill paths for the active profile."""
return self.get_active_profile().skill_paths
def add_profile(self, profile: Profile) -> None:
"""
Add a new profile to the manifest.
Args:
profile: Profile to add
"""
manifest = self.load_manifest()
manifest.profiles[profile.name] = profile
def remove_profile(self, name: str) -> bool:
"""
Remove a profile from the manifest.
Args:
name: Profile name to remove
Returns:
True if removed, False if didn't exist
"""
manifest = self.load_manifest()
if name in manifest.profiles:
del manifest.profiles[name]
if self._active_profile_name == name:
self._active_profile_name = manifest.default_profile
return True
return False
def save_manifest(self) -> bool:
"""
Save current manifest to file.
Returns:
True if saved successfully
"""
if self._manifest is None:
return False
try:
# Convert manifest to dict
data = {
"version": self._manifest.version,
"default_profile": self._manifest.default_profile,
"metadata": self._manifest.metadata,
"profiles": {},
}
for name, profile in self._manifest.profiles.items():
skills = []
for skill in profile.skills:
skill_dict = {"path": skill.path}
if not skill.enabled:
skill_dict["enabled"] = False
if skill.priority != 0:
skill_dict["priority"] = skill.priority
if skill.description:
skill_dict["description"] = skill.description
skills.append(skill_dict)
data["profiles"][name] = {
"name": profile.name,
"description": profile.description,
"tags": profile.tags,
"auto_activate": profile.auto_activate,
"skills": skills,
}
# Ensure directory exists
self.manifest_path.parent.mkdir(parents=True, exist_ok=True)
# Write file
self.manifest_path.write_text(yaml.dump(data, default_flow_style=False))
logger.info(f"Saved manifest to {self.manifest_path}")
return True
except Exception as e:
logger.error(f"Failed to save manifest: {e}")
return False
# =============================================================================
# Integration with LazyToolLoader
# =============================================================================
def get_skills_for_current_profile(
context: Optional[str] = None,
manifest_path: Optional[Path] = None,
) -> List[str]:
"""
Get skill paths for the current profile.
This integrates with the lazy loader to provide profile-based
skill selection.
Args:
context: Optional context for auto-selection
manifest_path: Optional path to manifest file
Returns:
List of skill paths
"""
loader = ManifestLoader(manifest_path)
# Auto-select profile if context provided
if context:
loader.auto_select_profile(context)
return loader.get_skills_for_active_profile()
def get_profile_skill_content(
profile_name: Optional[str] = None,
manifest_path: Optional[Path] = None,
) -> str:
"""
Get combined skill content for a profile.
Args:
profile_name: Profile to get content for. If None, uses active.
manifest_path: Optional path to manifest file
Returns:
Combined skill content string
"""
from agent.lazy_loader import get_lazy_tool_loader
loader = ManifestLoader(manifest_path)
lazy = get_lazy_tool_loader()
if profile_name:
skills = loader.get_skills_for_profile(profile_name)
else:
skills = loader.get_skills_for_active_profile()
combined = []
for skill_path in skills:
content = lazy.get_skill_content(skill_path)
if content:
combined.append(f"\n\n=== Skill: {skill_path} ===\n{content}")
return "\n".join(combined)
# =============================================================================
# Singleton
# =============================================================================
_manifest_loader: Optional[ManifestLoader] = None
def get_manifest_loader() -> ManifestLoader:
"""Get the singleton ManifestLoader instance."""
global _manifest_loader
if _manifest_loader is None:
_manifest_loader = ManifestLoader()
return _manifest_loader
def get_active_profile() -> Profile:
"""Get the active profile."""
return get_manifest_loader().get_active_profile()
def switch_profile(name: str) -> bool:
"""Switch to a different profile."""
return get_manifest_loader().switch_profile(name)

16
hermes-gateway.service Normal file
View File

@@ -0,0 +1,16 @@
[Unit]
Description=Hermes Gateway - AI Agent Messaging Platform
After=network.target
[Service]
Type=simple
User=long
WorkingDirectory=/home/long/hermes-agent
Environment="PATH=/home/long/hermes-agent/venv/bin:/usr/local/bin:/usr/bin:/bin"
Environment="HERMES_HOME=/home/long/.hermes"
ExecStart=/home/long/hermes-agent/venv/bin/python /home/long/hermes-agent/gateway/run.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target

View File

@@ -158,6 +158,8 @@ def _discover_tools():
"tools.send_message_tool",
# "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin
"tools.homeassistant_tool",
"tools.hook_tool",
"tools.reviewer_tool",
]
import importlib
for mod_name in _modules:

View File

@@ -547,6 +547,23 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
stale_warning = _check_file_staleness(path, task_id)
file_ops = _get_file_ops(task_id)
result = file_ops.write_file(path, content)
# Run post-write hooks on successful write
if result.error is None:
try:
from tools.hooks import run_hooks, EVENT_FILE_WRITE
hook_results = run_hooks(EVENT_FILE_WRITE, file_path=path)
blocked = [r for r in hook_results if r.blocked]
if blocked:
# Hook blocked - pre-write or post-write hook failed with block_on_failure
return json.dumps({
"error": f"Hook blocked: {blocked[0].message}",
"_hooks_blocked": True,
"_hook_name": blocked[0].hook_name,
}, ensure_ascii=False)
except Exception as hook_err:
logger.warning("Hook execution failed: %s", hook_err)
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning

150
tools/hook_tool.py Normal file
View File

@@ -0,0 +1,150 @@
"""Hook management tools for Hermes.
Provides tools for listing, creating, deleting, enabling, and disabling hooks.
Hooks are YAML files in ~/.hermes/hooks/ that run actions on file/command events.
"""
import json
import os
from tools.registry import registry
# Lazy import to avoid circular imports
def _get_hooks_module():
from tools import hooks as _hooks
return _hooks
def _hooks_list(action: str = "list", hook_path: str = None) -> str:
"""Tool wrapper for hook operations."""
hooks = _get_hooks_module()
if action == "list":
result = hooks.list_hooks()
if not result:
return json.dumps({"hooks": {}, "message": "No hooks configured"})
return json.dumps({"hooks": result}, ensure_ascii=False)
elif action == "create":
if not hook_path:
return json.dumps({"error": "hook_path required for create"})
parts = hook_path.split("/", 1)
if len(parts) != 2:
return json.dumps({"error": "hook_path must be in format '<subdir>/<name>' (e.g. 'post-write/pytest-on-py')"})
subdir, name = parts
if subdir not in ("pre-write", "post-write", "pre-command", "post-command"):
return json.dumps({"error": f"Invalid subdir '{subdir}'. Must be one of: pre-write, post-write, pre-command, post-command"})
success, msg = hooks.create_hook(
name=name,
subdir=subdir,
trigger_event="file_write",
pattern="*.py",
actions=[{"type": "command", "command": "pytest {file}", "timeout": 60, "block_on_failure": True}],
description=f"Auto-created hook: {name}",
)
return json.dumps({"success": success, "message": msg}, ensure_ascii=False)
elif action == "delete":
if not hook_path:
return json.dumps({"error": "hook_path required for delete"})
success, msg = hooks.delete_hook(hook_path)
return json.dumps({"success": success, "message": msg}, ensure_ascii=False)
elif action == "enable":
if not hook_path:
return json.dumps({"error": "hook_path required for enable"})
success, msg = hooks.enable_hook(hook_path)
return json.dumps({"success": success, "message": msg}, ensure_ascii=False)
elif action == "disable":
if not hook_path:
return json.dumps({"error": "hook_path required for disable"})
success, msg = hooks.disable_hook(hook_path)
return json.dumps({"success": success, "message": msg}, ensure_ascii=False)
elif action == "dry_run":
if not hook_path:
return json.dumps({"error": "hook_path required for dry_run"})
# Dry run a specific hook
hooks_dir = hooks._get_hooks_dir()
full_path = hooks_dir / hook_path
if not full_path.exists():
return json.dumps({"error": f"Hook not found: {hook_path}"})
hook = hooks._load_hook_file(full_path)
if not hook:
return json.dumps({"error": f"Failed to load hook: {hook_path}"})
# Simulate what would happen
return json.dumps({
"hook": hook.name,
"description": hook.description,
"trigger_event": hook.trigger_event,
"trigger_pattern": hook.trigger_pattern,
"actions": [{"type": a.type, "command": a.command} for a in hook.actions],
"dry_run": True,
"note": "Commands would be shown but not executed",
}, ensure_ascii=False)
else:
return json.dumps({"error": f"Unknown action: {action}"})
# Schema for hook tool
_hooks_schema = {
"name": "hooks",
"description": """Manage Hermes hooks — trigger-based automation for file and command events.
Hooks run actions (commands, notifications) when files are written or commands are executed.
**Hook directories:**
- `pre-write/` — before file write (can block the write)
- `post-write/` — after file write
- `pre-command/` — before terminal command (can block the command)
- `post-command/` — after terminal command
**Actions:**
- `command` — run a shell command
- `notification` — log or send notification
- `log` — log a message
**Examples:**
- List all hooks: `{"action": "list"}`
- Create a Python test hook: `{"action": "create", "hook_path": "post-write/pytest-on-py"}`
- Delete a hook: `{"action": "delete", "hook_path": "post-write/pytest-on-py"}`
- Dry run a hook: `{"action": "dry_run", "hook_path": "post-write/pytest-on-py"}`""",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "create", "delete", "enable", "disable", "dry_run"],
"description": "Action to perform",
},
"hook_path": {
"type": "string",
"description": "Hook path for create/delete/enable/disable/dry_run. Format: '<subdir>/<name>' e.g. 'post-write/pytest-on-py'",
},
},
"required": ["action"],
},
}
def _check_hooks_available() -> bool:
"""Check if hooks module is available."""
return True
registry.register(
name="hooks",
toolset="file",
schema=_hooks_schema,
handler=lambda args, **kw: _hooks_list(
action=args.get("action", "list"),
hook_path=args.get("hook_path"),
),
check_fn=_check_hooks_available,
requires_env=[],
is_async=False,
description="Manage Hermes hooks for file/command automation",
emoji="🪝",
)

578
tools/hooks.py Normal file
View File

@@ -0,0 +1,578 @@
#!/usr/bin/env python3
"""
Hermes Hook System - Trigger-based automation for file and command events.
Hooks are YAML config files in ~/.hermes/hooks/ that run actions when
specific events occur (file writes, terminal commands, etc.).
Directory Structure:
~/.hermes/hooks/
├── pre-write/ # Before file write
│ └── <hook>.yaml
├── post-write/ # After file write
│ └── <hook>.yaml
├── pre-command/ # Before terminal command
│ └── <hook>.yaml
└── post-command/ # After terminal command
└── <hook>.yaml
Hook Config Format:
name: pytest-on-py
description: Run pytest on Python files after write
enabled: true
trigger:
event: file_write # file_write | file_patch | terminal_command
pattern: "*.py" # glob pattern
path: null # optional directory filter
actions:
- type: command
command: pytest {file}
timeout: 60
block_on_failure: true
- type: notification
message: "Tests passed for {file}"
conditions:
if_file_exists: true # only for pre-write (file already exists)
if_command_exists: [pytest, ruff]
Exit Codes:
- Hook command exit 0 + block_on_failure=false → continue
- Hook command exit 0 + block_on_failure=true → continue
- Hook command exit != 0 + block_on_failure=true → BLOCK
- Hook command exit != 0 + block_on_failure=false → warn + continue
"""
import fnmatch
import json
import logging
import os
import subprocess
import threading
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Optional
import yaml
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Hook event types
EVENT_FILE_WRITE = "file_write"
EVENT_FILE_PATCH = "file_patch"
EVENT_FILE_READ = "file_read"
EVENT_TERMINAL_COMMAND = "terminal_command"
# Hook directories relative to HERMES_HOME
HOOK_SUBDIRS = ["pre-write", "post-write", "pre-command", "post-command"]
# Cache for loaded hooks (thread-safe)
_hooks_cache: dict[str, list["Hook"]] = {}
_cache_lock = threading.RLock()
_cache_version: int = 0
def _get_hooks_dir() -> Path:
"""Return the hooks directory path."""
return get_hermes_home() / "hooks"
@dataclass
class HookAction:
"""A single action within a hook."""
type: str # "command", "notification", "log"
command: Optional[str] = None
message: Optional[str] = None
timeout: int = 30
block_on_failure: bool = False
continue_on_error: bool = False
platform: Optional[str] = None # telegram, discord, etc.
@dataclass
class Hook:
"""A loaded hook configuration."""
name: str
description: str
enabled: bool = True
trigger_event: Optional[str] = None
trigger_pattern: Optional[str] = None
trigger_path: Optional[str] = None
actions: list[HookAction] = field(default_factory=list)
conditions: dict[str, Any] = field(default_factory=dict)
source_file: Optional[Path] = None
def matches(self, event: str, file_path: Optional[str] = None) -> bool:
"""Check if this hook matches the given event and file."""
if not self.enabled:
return False
if self.trigger_event and self.trigger_event != event:
return False
if self.trigger_pattern and file_path:
if not fnmatch.fnmatch(Path(file_path).name, self.trigger_pattern):
return False
if self.trigger_path and file_path:
if not Path(file_path).as_posix().startswith(self.trigger_path):
return False
return True
@dataclass
class HookResult:
"""Result of running a hook."""
hook_name: str
success: bool
blocked: bool = False
message: str = ""
details: str = ""
def load_hooks(event: str) -> list[Hook]:
"""Load all enabled hooks matching the given event type."""
global _cache_version
cache_key = event
with _cache_lock:
# Check if we have a valid cache
if cache_key in _hooks_cache:
return _hooks_cache[cache_key]
hooks_dir = _get_hooks_dir()
loaded_hooks: list[Hook] = []
# Determine which subdirs to check based on event
subdir_map = {
EVENT_FILE_WRITE: ["pre-write", "post-write"],
EVENT_FILE_PATCH: ["pre-write", "post-write"],
EVENT_FILE_READ: [],
EVENT_TERMINAL_COMMAND: ["pre-command", "post-command"],
}
subdirs = subdir_map.get(event, [])
for subdir in subdirs:
subdir_path = hooks_dir / subdir
if not subdir_path.is_dir():
continue
for hook_file in subdir_path.glob("*.yaml"):
try:
hook = _load_hook_file(hook_file)
if hook and hook.matches(event):
loaded_hooks.append(hook)
except Exception as e:
logger.warning(f"Failed to load hook {hook_file}: {e}")
with _cache_lock:
_hooks_cache[cache_key] = loaded_hooks
return loaded_hooks
def _load_hook_file(path: Path) -> Optional[Hook]:
"""Load a single hook YAML file."""
try:
with open(path, "r") as f:
data = yaml.safe_load(f)
if not data:
return None
name = data.get("name", path.stem)
enabled = data.get("enabled", True)
trigger = data.get("trigger", {})
actions = []
for action_data in data.get("actions", []):
action = HookAction(
type=action_data.get("type", "command"),
command=action_data.get("command"),
message=action_data.get("message"),
timeout=action_data.get("timeout", 30),
block_on_failure=action_data.get("block_on_failure", False),
continue_on_error=action_data.get("continue_on_error", False),
platform=action_data.get("platform"),
)
actions.append(action)
conditions = data.get("conditions", {})
# Determine event type from subdir
subdir = path.parent.name
event_map = {
"pre-write": EVENT_FILE_WRITE,
"post-write": EVENT_FILE_WRITE,
"pre-command": EVENT_TERMINAL_COMMAND,
"post-command": EVENT_TERMINAL_COMMAND,
}
hook_event = trigger.get("event") or event_map.get(subdir, EVENT_FILE_WRITE)
return Hook(
name=name,
description=data.get("description", ""),
enabled=enabled,
trigger_event=hook_event,
trigger_pattern=trigger.get("pattern"),
trigger_path=trigger.get("path"),
actions=actions,
conditions=conditions,
source_file=path,
)
except Exception as e:
logger.error(f"Error loading hook {path}: {e}")
return None
def _check_conditions(hook: Hook, file_path: Optional[str] = None) -> bool:
"""Check if hook conditions are met."""
conditions = hook.conditions
# Check if_command_exists
if "if_command_exists" in conditions:
required_commands = conditions["if_command_exists"]
if isinstance(required_commands, str):
required_commands = [required_commands]
for cmd in required_commands:
if not _command_exists(cmd):
return False
# Check if_file_exists
if "if_file_exists" in conditions:
if file_path:
exists = Path(file_path).exists()
if conditions["if_file_exists"] and not exists:
return False
if not conditions["if_file_exists"] and exists:
return False
return True
def _command_exists(cmd: str) -> bool:
"""Check if a command exists in PATH."""
try:
result = subprocess.run(
["which", cmd],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0
except Exception:
return False
def _expand_variables(text: str, context: dict[str, Any]) -> str:
"""Expand {variable} placeholders in text."""
result = text
for key, value in context.items():
placeholder = "{" + key + "}"
if placeholder in result:
result = result.replace(placeholder, str(value))
return result
def _run_hook_action(
action: HookAction, context: dict[str, Any], dry_run: bool = False
) -> tuple[bool, str]:
"""
Run a single hook action.
Returns (success, message).
"""
if action.type == "command":
if not action.command:
return True, "No command specified"
if dry_run:
return True, f"[DRY RUN] Would run: {action.command}"
command = _expand_variables(action.command, context)
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=action.timeout,
)
if result.returncode != 0:
error_msg = f"Command failed: {result.stderr or result.stdout}"
if action.block_on_failure:
return False, error_msg
elif not action.continue_on_error:
return False, error_msg
# continue_on_error = True → warn but continue
return True, f"WARN: {error_msg}"
return True, result.stdout or "Command succeeded"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {action.timeout}s"
except Exception as e:
return False, f"Command error: {e}"
elif action.type == "notification":
message = _expand_variables(action.message or "", context)
platform = action.platform
if platform == "telegram":
# Would integrate with send_message_tool
logger.info(f"[NOTIFICATION] {message}")
return True, f"Notification sent: {message}"
elif platform == "discord":
logger.info(f"[DISCORD] {message}")
return True, f"Discord notification sent: {message}"
else:
# Log to stdout
logger.info(f"[NOTIFICATION] {message}")
return True, message
elif action.type == "log":
message = _expand_variables(action.message or "", context)
logger.info(f"[HOOK LOG] {message}")
return True, message
return True, f"Unknown action type: {action.type}"
def run_hooks(
event: str,
file_path: Optional[str] = None,
command: Optional[str] = None,
context: Optional[dict[str, Any]] = None,
dry_run: bool = False,
) -> list[HookResult]:
"""
Run all hooks matching the given event.
Args:
event: Event type (EVENT_FILE_WRITE, etc.)
file_path: File path for file events
command: Command for terminal events
context: Additional context for variable expansion
dry_run: If True, don't execute commands, just show what would run
Returns:
List of HookResult, one per hook that ran
"""
hooks = load_hooks(event)
results: list[HookResult] = []
blocked = False
# Build context for variable expansion
ctx = context or {}
if file_path:
ctx["file"] = file_path
ctx["filename"] = Path(file_path).name
ctx["dir"] = str(Path(file_path).parent)
if command:
ctx["command"] = command
for hook in hooks:
# Check conditions
if not _check_conditions(hook, file_path):
continue
# Check if pre-write and file doesn't exist (for new files)
if "if_file_exists" in hook.conditions:
# Already checked in _check_conditions
pass
hook_result = HookResult(
hook_name=hook.name,
success=True,
blocked=False,
message="",
details="",
)
for action in hook.actions:
success, message = _run_hook_action(action, ctx, dry_run)
if not success:
hook_result.success = False
hook_result.details = message
if action.block_on_failure:
hook_result.blocked = True
hook_result.message = f"Hook '{hook.name}' blocked: {message}"
blocked = True
results.append(hook_result)
return results # Early return on block
else:
hook_result.message = f"Hook '{hook.name}' warned: {message}"
else:
if message:
hook_result.details += message + "\n"
results.append(hook_result)
if blocked:
# Add a summary result if hooks blocked
results.insert(
0,
HookResult(
hook_name="__HOOK_BLOCK__",
success=False,
blocked=True,
message="Hook execution was blocked",
),
)
return results
def invalidate_cache():
"""Clear the hooks cache to force reload."""
global _cache_version
with _cache_lock:
_hooks_cache.clear()
_cache_version += 1
def list_hooks() -> dict[str, list[dict[str, Any]]]:
"""List all configured hooks by type."""
hooks_dir = _get_hooks_dir()
result: dict[str, list[dict[str, Any]]] = {}
for subdir in HOOK_SUBDIRS:
subdir_path = hooks_dir / subdir
if not subdir_path.is_dir():
continue
hooks_list: list[dict[str, Any]] = []
for hook_file in sorted(subdir_path.glob("*.yaml")):
try:
hook = _load_hook_file(hook_file)
if hook:
hooks_list.append(
{
"name": hook.name,
"description": hook.description,
"enabled": hook.enabled,
"trigger_event": hook.trigger_event,
"trigger_pattern": hook.trigger_pattern,
"actions_count": len(hook.actions),
"source": str(hook_file.relative_to(hooks_dir)),
}
)
except Exception as e:
logger.warning(f"Failed to load hook {hook_file}: {e}")
if hooks_list:
result[subdir] = hooks_list
return result
def create_hook(
name: str,
subdir: str,
trigger_event: str,
pattern: Optional[str] = None,
actions: Optional[list[dict[str, Any]]] = None,
description: str = "",
) -> tuple[bool, str]:
"""
Create a new hook file.
Args:
name: Hook name (will be filename)
subdir: One of pre-write, post-write, pre-command, post-command
trigger_event: Event type
pattern: Optional glob pattern
actions: List of action dicts
description: Hook description
Returns:
(success, message)
"""
hooks_dir = _get_hooks_dir()
subdir_path = hooks_dir / subdir
try:
subdir_path.mkdir(parents=True, exist_ok=True)
hook_data = {
"name": name,
"description": description,
"enabled": True,
"trigger": {"event": trigger_event},
"actions": actions or [],
}
if pattern:
hook_data["trigger"]["pattern"] = pattern
hook_file = subdir_path / f"{name}.yaml"
with open(hook_file, "w") as f:
yaml.dump(hook_data, f, default_flow_style=False, sort_keys=False)
invalidate_cache()
return True, f"Created hook: {hook_file}"
except Exception as e:
return False, f"Failed to create hook: {e}"
def delete_hook(hook_path: str) -> tuple[bool, str]:
"""
Delete a hook by its relative path (e.g., 'post-write/pytest-on-py').
Returns:
(success, message)
"""
hooks_dir = _get_hooks_dir()
full_path = hooks_dir / hook_path
if not full_path.exists() or not str(full_path).startswith(str(hooks_dir)):
return False, "Hook not found or invalid path"
try:
full_path.unlink()
invalidate_cache()
return True, f"Deleted hook: {hook_path}"
except Exception as e:
return False, f"Failed to delete hook: {e}"
def enable_hook(hook_path: str) -> tuple[bool, str]:
"""Enable a hook."""
return _set_hook_enabled(hook_path, True)
def disable_hook(hook_path: str) -> tuple[bool, str]:
"""Disable a hook."""
return _set_hook_enabled(hook_path, False)
def _set_hook_enabled(hook_path: str, enabled: bool) -> tuple[bool, str]:
"""Set a hook's enabled state."""
hooks_dir = _get_hooks_dir()
full_path = hooks_dir / hook_path
if not full_path.exists():
return False, "Hook not found"
try:
with open(full_path, "r") as f:
data = yaml.safe_load(f)
data["enabled"] = enabled
with open(full_path, "w") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
invalidate_cache()
return True, f"{'Enabled' if enabled else 'Disabled'} hook: {hook_path}"
except Exception as e:
return False, f"Failed to update hook: {e}"

684
tools/reviewer_tool.py Normal file
View File

@@ -0,0 +1,684 @@
"""
Code Reviewer Tool
Runs code quality checks for various languages.
Automatically detects language and runs appropriate linters, type checkers, and tests.
Usage:
from tools.reviewer_tool import run_code_review
result = run_code_review(
language="python",
paths=["src/"],
check_types=["lint", "typecheck", "test"]
)
"""
import json
import logging
import os
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
# =============================================================================
# Language Configurations
# =============================================================================
LANGUAGE_CONFIG: Dict[str, Dict[str, Any]] = {
"python": {
"extensions": [".py"],
"lint": {
"commands": [
("ruff", ["ruff", "check", "."]),
("flake8", ["flake8", "."]),
],
"default": "ruff",
},
"typecheck": {
"commands": [
("mypy", ["mypy", "."]),
],
"default": "mypy",
},
"format": {
"commands": [
("ruff format", ["ruff", "format", "--check", "."]),
("black", ["black", "--check", "."]),
("isort", ["isort", "--check-only", "."]),
],
"default": "ruff format",
},
"test": {
"commands": [
("pytest", ["pytest", "-v", "--tb=short"]),
("pytest coverage", ["pytest", "--cov=.", "--cov-report=term-missing"]),
],
"default": "pytest",
},
"security": {
"commands": [
("bandit", ["bandit", "-r", "."]),
("safety", ["safety", "check"]),
],
"default": "bandit",
},
},
"javascript": {
"extensions": [".js", ".jsx", ".mjs"],
"lint": {
"commands": [
("eslint", ["npx", "eslint", "src/"]),
("eslint fix", ["npx", "eslint", "src/", "--fix"]),
],
"default": "eslint",
},
"typecheck": {
"commands": [
("tsc", ["npx", "tsc", "--noEmit"]),
],
"default": "tsc",
},
"format": {
"commands": [
("prettier", ["npx", "prettier", "--check", "src/"]),
],
"default": "prettier",
},
"test": {
"commands": [
("jest", ["npm", "test"]),
("vitest", ["npx", "vitest", "run"]),
],
"default": "jest",
},
},
"typescript": {
"extensions": [".ts", ".tsx"],
"lint": {
"commands": [
("eslint", ["npx", "eslint", "src/", "--ext", ".ts,.tsx"]),
],
"default": "eslint",
},
"typecheck": {
"commands": [
("tsc", ["npx", "tsc", "--noEmit", "--project", "tsconfig.json"]),
],
"default": "tsc",
},
"format": {
"commands": [
("prettier", ["npx", "prettier", "--check", "src/"]),
],
"default": "prettier",
},
"test": {
"commands": [
("jest", ["npx", "jest"]),
("vitest", ["npx", "vitest", "run"]),
],
"default": "jest",
},
},
"go": {
"extensions": [".go"],
"lint": {
"commands": [
("golangci-lint", ["golangci-lint", "run", "./..."]),
("gofmt", ["gofmt", "-d", "."]),
],
"default": "golangci-lint",
},
"typecheck": {
"commands": [
("go vet", ["go", "vet", "./..."]),
],
"default": "go vet",
},
"format": {
"commands": [
("gofmt", ["gofmt", "-d", "."]),
("go fmt", ["go", "fmt", "./..."]),
],
"default": "gofmt",
},
"test": {
"commands": [
("go test", ["go", "test", "-v", "./..."]),
("go test coverage", ["go", "test", "-coverprofile=coverage.out", "./..."]),
],
"default": "go test",
},
},
"rust": {
"extensions": [".rs"],
"lint": {
"commands": [
("clippy", ["cargo", "clippy", "--", "-D", "warnings"]),
],
"default": "clippy",
},
"typecheck": {
"commands": [
("cargo check", ["cargo", "check"]),
],
"default": "cargo check",
},
"format": {
"commands": [
("cargo fmt", ["cargo", "fmt", "--", "--check"]),
],
"default": "cargo fmt",
},
"test": {
"commands": [
("cargo test", ["cargo", "test", "--", "--nocapture"]),
],
"default": "cargo test",
},
},
"java": {
"extensions": [".java"],
"lint": {
"commands": [
("checkstyle", ["mvn", "checkstyle:check"]),
],
"default": "checkstyle",
},
"typecheck": {
"commands": [
("mvn compile", ["mvn", "compile"]),
],
"default": "mvn compile",
},
"format": {
"commands": [
("mvn formatter", ["mvn", "formatter:format"]),
],
"default": "mvn formatter",
},
"test": {
"commands": [
("mvn test", ["mvn", "test"]),
],
"default": "mvn test",
},
},
"cpp": {
"extensions": [".cpp", ".hpp", ".h", ".cc"],
"lint": {
"commands": [
("clang-tidy", ["clang-tidy", "src/**"]),
("cppcheck", ["cppcheck", "--enable=all", "src/"]),
],
"default": "clang-tidy",
},
"typecheck": {
"commands": [
("cmake build", ["cmake", "--build", "build", "--target", "all"]),
],
"default": "cmake build",
},
"format": {
"commands": [
("clang-format", ["clang-format", "-i", "src/**"]),
],
"default": "clang-format",
},
"test": {
"commands": [
("ctest", ["ctest", "--output-on-failure"]),
],
"default": "ctest",
},
},
"csharp": {
"extensions": [".cs"],
"lint": {
"commands": [
("dotnet format", ["dotnet", "format", "--verify-no-changes"]),
],
"default": "dotnet format",
},
"typecheck": {
"commands": [
("dotnet build", ["dotnet", "build", "--no-restore"]),
],
"default": "dotnet build",
},
"format": {
"commands": [
("dotnet format", ["dotnet", "format", "--verify-no-changes"]),
],
"default": "dotnet format",
},
"test": {
"commands": [
("dotnet test", ["dotnet", "test", "--no-build"]),
],
"default": "dotnet test",
},
},
"php": {
"extensions": [".php"],
"lint": {
"commands": [
("phpcs", ["./vendor/bin/phpcs", "--standard=PSR12", "src/"]),
("phpstan", ["./vendor/bin/phpstan", "analyse", "src/", "--level=max"]),
],
"default": "phpcs",
},
"typecheck": {
"commands": [
("php -l", ["php", "-l", "src/"]),
],
"default": "php -l",
},
"format": {
"commands": [
("phpcbf", ["./vendor/bin/phpcbf", "--standard=PSR12", "src/"]),
],
"default": "phpcbf",
},
"test": {
"commands": [
("phpunit", ["./vendor/bin/phpunit"]),
("pest", ["./vendor/bin/pest"]),
],
"default": "phpunit",
},
},
}
# =============================================================================
# Data Structures
# =============================================================================
@dataclass
class CheckResult:
"""Result of a single check."""
name: str
success: bool
command: str
output: str
duration_ms: float
error_count: int = 0
warning_count: int = 0
@dataclass
class ReviewResult:
"""Result of a full code review."""
language: str
paths: List[str]
success: bool
checks: List[CheckResult]
total_duration_ms: float
summary: str
blocked: bool = False # True if CRITICAL issues found
def to_dict(self) -> Dict:
return {
"language": self.language,
"paths": self.paths,
"success": self.success,
"blocked": self.blocked,
"total_duration_ms": self.total_duration_ms,
"summary": self.summary,
"checks": [
{
"name": c.name,
"success": c.success,
"command": c.command,
"error_count": c.error_count,
"warning_count": c.warning_count,
"output": c.output[:500] if len(c.output) > 500 else c.output,
}
for c in self.checks
],
}
# =============================================================================
# Core Functions
# =============================================================================
def detect_language(paths: List[str]) -> Optional[str]:
"""Detect language from file extensions."""
extension_map: Dict[str, str] = {}
for lang, config in LANGUAGE_CONFIG.items():
for ext in config.get("extensions", []):
extension_map[ext] = lang
detected = set()
for path in paths:
p = Path(path)
if p.is_file():
ext = p.suffix
if ext in extension_map:
detected.add(extension_map[ext])
elif p.is_dir():
for ext, lang in extension_map.items():
if list(p.rglob(f"*{ext}")):
detected.add(lang)
if len(detected) == 1:
return list(detected)[0]
elif len(detected) > 1:
# Prefer more specific languages
priority = ["typescript", "javascript", "python", "rust", "go", "java", "cpp", "csharp", "php"]
for lang in priority:
if lang in detected:
return lang
return None
def run_command(cmd: List[str], timeout: int = 120) -> Tuple[int, str, str]:
"""Run a command and return (returncode, stdout, stderr)."""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=os.getcwd(),
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", "Command timed out"
except FileNotFoundError:
return -2, "", f"Command not found: {cmd[0]}"
except Exception as e:
return -3, "", str(e)
def parse_check_output(name: str, output: str, returncode: int) -> Tuple[bool, int, int]:
"""Parse check output to determine success and counts."""
if returncode == -2:
# Command not found - not an error
return True, 0, 0
if returncode == 0:
return True, 0, 0
# Try to parse error/warning counts from output
error_count = 0
warning_count = 0
# Common patterns
output_lower = output.lower()
if "error" in output_lower:
import re
errors = re.findall(r'\b(\d+)\s+error', output_lower)
if errors:
error_count = sum(int(e) for e in errors)
if "warning" in output_lower:
warnings = re.findall(r'\b(\d+)\s+warning', output_lower)
if warnings:
warning_count = sum(int(w) for w in warnings)
success = returncode == 0
return success, error_count, warning_count
def run_check(
language: str,
check_type: str,
paths: List[str],
tool_name: Optional[str] = None,
) -> CheckResult:
"""Run a single check for a language."""
import time
if language not in LANGUAGE_CONFIG:
return CheckResult(
name=f"{check_type}",
success=False,
command="",
output=f"Unknown language: {language}",
duration_ms=0,
)
config = LANGUAGE_CONFIG[language]
check_config = config.get(check_type)
if not check_config:
return CheckResult(
name=f"{check_type}",
success=True,
command="",
output=f"No {check_type} configured for {language}",
duration_ms=0,
)
commands = check_config.get("commands", [])
if not commands:
return CheckResult(
name=f"{check_type}",
success=True,
command="",
output=f"No commands for {check_type}",
duration_ms=0,
)
# Find the requested tool or use default
if tool_name:
cmd_list = [c for c in commands if c[0] == tool_name]
if not cmd_list:
return CheckResult(
name=f"{check_type}",
success=False,
command="",
output=f"Tool {tool_name} not found for {check_type}",
duration_ms=0,
)
cmd = cmd_list[0]
else:
# Use default tool
default_name = check_config.get("default", commands[0][0])
cmd = next((c for c in commands if c[0] == default_name), commands[0])
# Expand paths
expanded_cmd = cmd[1]
if paths:
# Add paths to command if it doesn't already have them
if "." in expanded_cmd or any("$" not in c for c in expanded_cmd):
expanded_cmd = expanded_cmd + paths
start_time = time.time()
returncode, stdout, stderr = run_command(expanded_cmd)
duration_ms = (time.time() - start_time) * 1000
output = stdout if stdout else stderr
success, error_count, warning_count = parse_check_output(cmd[0], output, returncode)
return CheckResult(
name=f"{check_type}:{cmd[0]}",
success=success,
command=" ".join(expanded_cmd),
output=output,
duration_ms=duration_ms,
error_count=error_count,
warning_count=warning_count,
)
def run_code_review(
language: Optional[str] = None,
paths: Optional[List[str]] = None,
check_types: Optional[List[str]] = None,
tools: Optional[Dict[str, str]] = None,
) -> ReviewResult:
"""
Run a full code review.
Args:
language: Language to check (auto-detected if not provided)
paths: Files/directories to check (defaults to current directory)
check_types: Types of checks to run (defaults to lint, typecheck)
tools: Override specific tools, e.g., {"lint": "flake8", "test": "pytest"}
Returns:
ReviewResult with all check results
"""
import time
if paths is None:
paths = ["."]
if language is None:
language = detect_language(paths) or "unknown"
if check_types is None:
check_types = ["lint", "typecheck"]
if tools is None:
tools = {}
start_time = time.time()
checks = []
total_errors = 0
total_warnings = 0
for check_type in check_types:
tool_name = tools.get(check_type)
result = run_check(language, check_type, paths, tool_name)
checks.append(result)
total_errors += result.error_count
total_warnings += result.warning_count
total_duration_ms = (time.time() - start_time) * 1000
# Determine overall success and blocked status
all_passed = all(c.success for c in checks)
has_critical = total_errors > 0
summary = f"{len(checks)} checks, {total_errors} errors, {total_warnings} warnings"
return ReviewResult(
language=language,
paths=paths,
success=all_passed,
checks=checks,
total_duration_ms=total_duration_ms,
summary=summary,
blocked=has_critical and check_types == ["lint"],
)
# =============================================================================
# Tool Entry Point
# =============================================================================
def code_review_tool(
language: str = None,
paths: str = None,
check_types: str = None,
tools: str = None,
) -> str:
"""
Run code quality checks for a language.
Args:
language: Language to check (python, javascript, typescript, go, rust, java, cpp, csharp, php)
paths: Comma-separated list of files/directories to check (default: current directory)
check_types: Comma-separated check types (lint, typecheck, format, test, security)
tools: JSON string mapping check types to specific tools, e.g., '{"lint": "ruff"}'
Returns:
JSON string with ReviewResult
"""
# Parse inputs
lang = language.lower() if language else None
path_list = [p.strip() for p in paths.split(",")] if paths else ["."]
check_list = [c.strip() for c in check_types.split(",")] if check_types else ["lint", "typecheck"]
tool_map = json.loads(tools) if tools else {}
# Run review
result = run_code_review(
language=lang,
paths=path_list,
check_types=check_list,
tools=tool_map,
)
return json.dumps(result.to_dict(), indent=2)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python reviewer_tool.py <language> [paths] [check_types]")
sys.exit(1)
language = sys.argv[1]
paths = sys.argv[2:3] if len(sys.argv) > 2 else ["."]
check_types = sys.argv[3:4] if len(sys.argv) > 3 else ["lint"]
result = run_code_review(
language=language,
paths=paths,
check_types=check_types,
)
print(json.dumps(result.to_dict(), indent=2))
# =============================================================================
# Registry Registration
# =============================================================================
from tools.registry import registry
registry.register(
name="code_review",
toolset="reviewer",
schema={
"name": "code_review",
"description": "Run code quality checks (lint, typecheck, test) for a specific language. "
"Supports Python, JavaScript, TypeScript, Go, Rust, Java, C++, C#, PHP. "
"Returns structured JSON with check results, error counts, and blocked status.",
"parameters": {
"type": "object",
"properties": {
"language": {
"type": "string",
"description": "Language to check (python, javascript, typescript, go, rust, java, cpp, csharp, php). "
"Auto-detected if not provided.",
},
"paths": {
"type": "string",
"description": "Comma-separated list of files/directories to check. Defaults to current directory.",
},
"check_types": {
"type": "string",
"description": "Comma-separated check types: lint, typecheck, format, test, security. "
"Defaults to 'lint,typecheck'.",
},
"tools": {
"type": "string",
"description": "JSON string mapping check types to specific tools, e.g., '{\"lint\": \"ruff\", \"test\": \"pytest\"}'",
},
},
},
},
handler=lambda args, **kw: code_review_tool(
language=args.get("language"),
paths=args.get("paths"),
check_types=args.get("check_types"),
tools=args.get("tools"),
),
check_fn=lambda: True, # Always available
requires_env=[],
description="Run code quality checks for a language",
emoji="🔍",
)

View File

@@ -34,7 +34,7 @@ _HERMES_CORE_TOOLS = [
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
"read_file", "write_file", "patch", "search_files", "hooks",
# Vision + image generation
"vision_analyze", "image_generate",
# Skills
@@ -56,6 +56,8 @@ _HERMES_CORE_TOOLS = [
"execute_code", "delegate_task",
# Cronjob management
"cronjob",
# Code review
"code_review",
# Cross-platform messaging (gated on gateway running via check_fn)
"send_message",
# Home Assistant smart home control (gated on HASS_TOKEN via check_fn)