diff --git a/agent/hook_profile.py b/agent/hook_profile.py new file mode 100644 index 00000000..a269ebea --- /dev/null +++ b/agent/hook_profile.py @@ -0,0 +1,525 @@ +""" +Hook Profile System + +Manages different security profiles for command hooks. +Users can switch between profiles to adjust the level of +security enforcement. + +Inspired by gstack's hook patterns (careful/freeze/guard). + +Usage: + from agent.hook_profile import HookProfileManager, get_profile_manager + + # Get the manager + manager = get_profile_manager() + + # Get current profile + profile = manager.get_active_profile() + + # Switch to strict mode + manager.switch_profile("strict") + + # Check if a command would be blocked + result = manager.check_command("rm -rf /") +""" + +import yaml +import logging +import subprocess +from pathlib import Path +from typing import Dict, List, Optional, Any, Callable +from dataclasses import dataclass, field +from enum import Enum + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + + +class ProfileLevel(Enum): + """Hook security profile levels.""" + MINIMAL = "minimal" + STANDARD = "standard" + STRICT = "strict" + + def __str__(self): + return self.value + + +@dataclass +class HookDefinition: + """Definition of a hook.""" + name: str + hook_type: str # "pre-command" or "post-write" + script_path: str + enabled: bool = True + config_path: Optional[str] = None # Optional YAML config + description: str = "" + blocks_on_match: bool = False # Does this hook block commands? + + @property + def full_path(self) -> Path: + """Get the full path to the hook script.""" + return Path.home() / ".hermes" / "hooks" / self.hook_type / self.script_path + + +@dataclass +class HookProfile: + """A named hook profile with associated hooks.""" + name: str + level: ProfileLevel + description: str + hooks: List[str] # Hook names to enable + env_overrides: Dict[str, str] = field(default_factory=dict) + auto_activate: bool = False # Auto-activate based on context + + @property + def hook_definitions(self) -> List[HookDefinition]: + """Get HookDefinition objects for this profile's hooks.""" + return [BUILTIN_HOOKS.get(h) for h in self.hooks if h in BUILTIN_HOOKS] + + +# ============================================================================= +# Built-in Hook Profiles +# ============================================================================= + +BUILTIN_HOOKS: Dict[str, HookDefinition] = { + "careful": HookDefinition( + name="careful", + hook_type="pre-command", + script_path="careful-hook.sh", + blocks_on_match=False, + description="Warn about dangerous commands but allow execution", + ), + "freeze": HookDefinition( + name="freeze", + hook_type="pre-command", + script_path="freeze-hook.sh", + blocks_on_match=True, + description="Block dangerous commands entirely", + ), + "guard": HookDefinition( + name="guard", + hook_type="pre-command", + script_path="guard-hook.sh", + blocks_on_match=False, + description="Context-aware guard with learning", + ), + "rust-fmt": HookDefinition( + name="rust-fmt", + hook_type="post-write", + script_path="rust-fmt-on-rs.yaml", + blocks_on_match=False, + description="Format Rust code on save", + ), + "cargo-check": HookDefinition( + name="cargo-check", + hook_type="post-write", + script_path="cargo-check-on-rs.yaml", + blocks_on_match=True, + description="Check Rust code on save", + ), + "go-fmt": HookDefinition( + name="go-fmt", + hook_type="post-write", + script_path="go-fmt-on-go.yaml", + blocks_on_match=False, + description="Format Go code on save", + ), + "ruff-lint": HookDefinition( + name="ruff-lint", + hook_type="post-write", + script_path="ruff-lint-py.yaml", + blocks_on_match=True, + description="Lint Python code on save with ruff", + ), + "pytest": HookDefinition( + name="pytest", + hook_type="post-write", + script_path="pytest-on-py.yaml", + blocks_on_match=True, + description="Run pytest on Python file change", + ), +} + + +BUILTIN_PROFILES: Dict[str, HookProfile] = { + "minimal": HookProfile( + name="minimal", + level=ProfileLevel.MINIMAL, + description="No security hooks - use with caution", + hooks=[], # No hooks + env_overrides={"HERMES_HOOKS_ENABLED": "false"}, + ), + "standard": HookProfile( + name="standard", + level=ProfileLevel.STANDARD, + description="Warning-only hooks for dangerous commands", + hooks=["careful"], + env_overrides={"HERMES_HOOKS_ENABLED": "true"}, + auto_activate=False, + ), + "strict": HookProfile( + name="strict", + level=ProfileLevel.STRICT, + description="Block dangerous commands - recommended for production", + hooks=["careful", "freeze"], + env_overrides={"HERMES_HOOKS_ENABLED": "true"}, + auto_activate=False, + ), + "developer": HookProfile( + name="developer", + level=ProfileLevel.STANDARD, + description="Standard hooks + code quality checks (Rust/Go/Python)", + hooks=["careful", "ruff-lint", "pytest", "go-fmt"], + env_overrides={"HERMES_HOOKS_ENABLED": "true"}, + auto_activate=False, + ), + "rust-dev": HookProfile( + name="rust-dev", + level=ProfileLevel.STANDARD, + description="Rust development profile with formatting and checks", + hooks=["careful", "rust-fmt", "cargo-check"], + env_overrides={"HERMES_HOOKS_ENABLED": "true"}, + auto_activate=False, + ), +} + + +# ============================================================================= +# Hook Profile Manager +# ============================================================================= + +class HookProfileManager: + """ + Manages hook security profiles. + + Allows switching between different hook configurations + to adjust the level of security enforcement. + """ + + def __init__(self, config_path: Optional[Path] = None): + """ + Initialize the hook profile manager. + + Args: + config_path: Path to store profile config. + Defaults to ~/.hermes/hook-profile.yaml + """ + if config_path is None: + config_path = get_hermes_home() / "hook-profile.yaml" + + self.config_path = config_path + self._active_profile_name: Optional[str] = "standard" + self._custom_profiles: Dict[str, HookProfile] = {} + self._load() + + def _load(self) -> None: + """Load profile configuration from disk.""" + if self.config_path.exists(): + try: + data = yaml.safe_load(self.config_path.read_text()) + self._active_profile_name = data.get("active_profile", "standard") + + # Load custom profiles + custom = data.get("custom_profiles", {}) + for name, profile_data in custom.items(): + self._custom_profiles[name] = HookProfile( + name=profile_data.get("name", name), + level=ProfileLevel(profile_data.get("level", "standard")), + description=profile_data.get("description", ""), + hooks=profile_data.get("hooks", []), + env_overrides=profile_data.get("env_overrides", {}), + auto_activate=profile_data.get("auto_activate", False), + ) + + logger.info(f"Loaded hook profile config: active={self._active_profile_name}") + except Exception as e: + logger.warning(f"Failed to load hook profile config: {e}") + self._active_profile_name = "standard" + + def _save(self) -> None: + """Save profile configuration to disk.""" + try: + data = { + "active_profile": self._active_profile_name, + "custom_profiles": {}, + } + + for name, profile in self._custom_profiles.items(): + data["custom_profiles"][name] = { + "name": profile.name, + "level": profile.level.value, + "description": profile.description, + "hooks": profile.hooks, + "env_overrides": profile.env_overrides, + "auto_activate": profile.auto_activate, + } + + self.config_path.parent.mkdir(parents=True, exist_ok=True) + self.config_path.write_text(yaml.dump(data)) + logger.debug(f"Saved hook profile config") + except Exception as e: + logger.error(f"Failed to save hook profile config: {e}") + + def get_active_profile(self) -> HookProfile: + """Get the currently active profile.""" + if self._active_profile_name in BUILTIN_PROFILES: + return BUILTIN_PROFILES[self._active_profile_name] + if self._active_profile_name in self._custom_profiles: + return self._custom_profiles[self._active_profile_name] + + # Fallback to standard + return BUILTIN_PROFILES["standard"] + + def get_profile(self, name: str) -> Optional[HookProfile]: + """Get a profile by name.""" + if name in BUILTIN_PROFILES: + return BUILTIN_PROFILES[name] + return self._custom_profiles.get(name) + + def list_profiles(self) -> List[str]: + """List all available profile names.""" + profiles = list(BUILTIN_PROFILES.keys()) + profiles.extend(self._custom_profiles.keys()) + return profiles + + def switch_profile(self, name: str) -> bool: + """ + Switch to a different hook profile. + + Args: + name: Profile name to switch to + + Returns: + True if switch successful, False if profile doesn't exist + """ + if name not in BUILTIN_PROFILES and name not in self._custom_profiles: + logger.warning(f"Hook profile '{name}' not found") + return False + + old_profile = self._active_profile_name + self._active_profile_name = name + self._save() + + logger.info(f"Switched hook profile: {old_profile} → {name}") + return True + + def check_command(self, command: str, full_command: Optional[str] = None) -> Dict[str, Any]: + """ + Check a command against the active profile's hooks. + + Args: + command: The command to check + full_command: Optional full command with args + + Returns: + Dict with keys: + - allowed: bool + - blocked: bool + - warned: bool + - messages: List[str] + - hook_name: str or None + """ + if full_command is None: + full_command = command + + profile = self.get_active_profile() + + result = { + "allowed": True, + "blocked": False, + "warned": False, + "messages": [], + "hook_name": None, + } + + for hook_name in profile.hooks: + hook_def = BUILTIN_HOOKS.get(hook_name) + if not hook_def or not hook_def.enabled: + continue + + hook_result = self._run_hook(hook_def, command, full_command) + + if hook_result is None: + continue + + if hook_result["blocked"]: + result["blocked"] = True + result["allowed"] = False + result["hook_name"] = hook_name + result["messages"].extend(hook_result["messages"]) + break + + if hook_result["warned"]: + result["warned"] = True + result["messages"].extend(hook_result["messages"]) + + return result + + def _run_hook(self, hook: HookDefinition, command: str, full_command: str) -> Optional[Dict[str, Any]]: + """ + Run a single hook and return the result. + + Args: + hook: Hook definition + command: Short command (first word) + full_command: Full command string + + Returns: + Dict with blocked/warned/messages, or None if hook not found + """ + if not hook.full_path.exists(): + logger.warning(f"Hook script not found: {hook.full_path}") + return None + + try: + proc = subprocess.run( + [str(hook.full_path), command, full_command], + capture_output=True, + text=True, + timeout=5, + ) + + output = proc.stdout.strip() + stderr = proc.stderr.strip() + + if output: + messages = output.split("\n") + else: + messages = [] + + if stderr: + messages.append(f"STDERR: {stderr}") + + return { + "blocked": proc.returncode != 0 and hook.blocks_on_match, + "warned": proc.returncode == 0 and bool(output), + "messages": messages, + } + except subprocess.TimeoutExpired: + logger.error(f"Hook timed out: {hook.name}") + return {"blocked": False, "warned": True, "messages": ["Hook timed out"]} + except Exception as e: + logger.error(f"Hook failed: {hook.name}: {e}") + return {"blocked": False, "warned": True, "messages": [f"Hook error: {e}"]} + + def add_profile(self, profile: HookProfile) -> None: + """ + Add a custom profile. + + Args: + profile: Profile to add + """ + self._custom_profiles[profile.name] = profile + self._save() + + def remove_profile(self, name: str) -> bool: + """ + Remove a custom profile. + + Args: + name: Profile name to remove + + Returns: + True if removed, False if not found or is builtin + """ + if name in BUILTIN_PROFILES: + logger.warning(f"Cannot remove builtin profile: {name}") + return False + + if name in self._custom_profiles: + del self._custom_profiles[name] + if self._active_profile_name == name: + self._active_profile_name = "standard" + self._save() + return True + + return False + + def auto_select_profile(self, context: str) -> HookProfile: + """ + Automatically select profile based on context. + + Args: + context: Conversation context + + Returns: + Selected profile + """ + context_lower = context.lower() + + # Check for keywords that suggest different profiles + if any(kw in context_lower for kw in ["production", "prod", "deploy", "release"]): + self.switch_profile("strict") + return self.get_active_profile() + + if any(kw in context_lower for kw in ["develop", "dev", "local", "test"]): + self.switch_profile("developer") + return self.get_active_profile() + + if any(kw in context_lower for kw in ["rust", "cargo", "rs "]): + self.switch_profile("rust-dev") + return self.get_active_profile() + + # Default to current profile + return self.get_active_profile() + + def get_enabled_hooks(self) -> List[HookDefinition]: + """Get list of enabled hooks for the active profile.""" + profile = self.get_active_profile() + enabled = [] + + for hook_name in profile.hooks: + hook_def = BUILTIN_HOOKS.get(hook_name) + if hook_def: + enabled.append(hook_def) + + return enabled + + +# ============================================================================= +# Singleton +# ============================================================================= + +_hook_profile_manager: Optional[HookProfileManager] = None + + +def get_profile_manager() -> HookProfileManager: + """Get the singleton HookProfileManager instance.""" + global _hook_profile_manager + if _hook_profile_manager is None: + _hook_profile_manager = HookProfileManager() + return _hook_profile_manager + + +def check_command(command: str, full_command: Optional[str] = None) -> Dict[str, Any]: + """ + Check a command against the active profile's hooks. + + Args: + command: The command to check + full_command: Optional full command with args + + Returns: + Dict with allowed/blocked/warned/messages + """ + return get_profile_manager().check_command(command, full_command) + + +def switch_hook_profile(name: str) -> bool: + """ + Switch to a different hook profile. + + Args: + name: Profile name + + Returns: + True if successful + """ + return get_profile_manager().switch_profile(name) + + +def get_active_hook_profile() -> HookProfile: + """Get the currently active hook profile.""" + return get_profile_manager().get_active_profile() diff --git a/agent/instinct.py b/agent/instinct.py new file mode 100644 index 00000000..4809a903 --- /dev/null +++ b/agent/instinct.py @@ -0,0 +1,508 @@ +""" +Instinct Learning System + +Learns from successful interactions to improve tool/skill loading. +Tracks patterns of what tools and skills are used together, +and uses these patterns to predict future loading needs. + +Inspired by ECC's experience tracking. + +Usage: + from agent.instinct import InstinctLearner, get_instinct_learner + + # Get the learner singleton + learner = get_instinct_learner() + + # Record a successful interaction + learner.record_interaction(context="debug python code", tools=["terminal", "read_file"], skills=["systematic-debugging"]) + + # Get predicted skills for context + predictions = learner.predict_for_context("my python is broken") +""" + +import json +import logging +import time +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple, Any +from dataclasses import dataclass, field, asdict +from collections import defaultdict +from functools import lru_cache + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Data Structures +# ============================================================================= + +@dataclass +class InteractionRecord: + """Record of a successful interaction.""" + timestamp: float + context: str + context_hash: str # For quick lookup + tools: List[str] + skills: List[str] + success: bool = True + feedback: Optional[str] = None # Optional user feedback + + @classmethod + def create(cls, context: str, tools: List[str], skills: List[str], + feedback: Optional[str] = None) -> "InteractionRecord": + import hashlib + context_hash = hashlib.md5(context.lower().encode()).hexdigest()[:12] + return cls( + timestamp=time.time(), + context=context, + context_hash=context_hash, + tools=sorted(tools), + skills=sorted(skills), + feedback=feedback, + ) + + +@dataclass +class PatternStats: + """Statistics for a context pattern.""" + count: int = 0 + total_success: int = 0 + avg_tools: float = 0.0 + avg_skills: float = 0.0 + last_used: float = 0.0 + + # Tool/skill co-occurrence counts + tool_counts: Dict[str, int] = field(default_factory=dict) + skill_counts: Dict[str, int] = field(default_factory=dict) + + @property + def success_rate(self) -> float: + if self.count == 0: + return 0.0 + return self.total_success / self.count + + def to_dict(self) -> Dict: + return { + "count": self.count, + "total_success": self.total_success, + "avg_tools": self.avg_tools, + "avg_skills": self.avg_skills, + "last_used": self.last_used, + "tool_counts": self.tool_counts, + "skill_counts": self.skill_counts, + } + + @classmethod + def from_dict(cls, data: Dict) -> "PatternStats": + return cls( + count=data.get("count", 0), + total_success=data.get("total_success", 0), + avg_tools=data.get("avg_tools", 0.0), + avg_skills=data.get("avg_skills", 0.0), + last_used=data.get("last_used", 0.0), + tool_counts=data.get("tool_counts", {}), + skill_counts=data.get("skill_counts", {}), + ) + + +@dataclass +class InstinctProfile: + """ + Learned instincts for a context pattern. + + Created by aggregating InteractionRecords. + """ + pattern: str # The context pattern/key + tools: List[str] # Most commonly used tools + skills: List[str] # Most commonly used skills + confidence: float # 0-1 based on sample size + sample_size: int + last_used: float + + def to_dict(self) -> Dict: + return { + "pattern": self.pattern, + "tools": self.tools, + "skills": self.skills, + "confidence": self.confidence, + "sample_size": self.sample_size, + "last_used": self.last_used, + } + + +# ============================================================================= +# Instinct Learner +# ============================================================================= + +class InstinctLearner: + """ + Learns from interactions to predict tool/skill loading. + + Tracks patterns and builds instincts that can be used + to preload tools and skills before they're explicitly needed. + """ + + def __init__(self, storage_path: Optional[Path] = None): + """ + Initialize the instinct learner. + + Args: + storage_path: Path to store interaction data. + Defaults to ~/.hermes/instinct/ + """ + if storage_path is None: + storage_path = get_hermes_home() / "instinct" + + self.storage_path = storage_path + self.storage_path.mkdir(parents=True, exist_ok=True) + + self.interactions_file = storage_path / "interactions.jsonl" + self.patterns_file = storage_path / "patterns.json" + self.instincts_file = storage_path / "instincts.json" + + # In-memory cache + self._patterns: Dict[str, PatternStats] = {} + self._instincts: Dict[str, InstinctProfile] = {} + self._context_hints: Dict[str, List[str]] = {} # hint -> [pattern] + + # Load existing data + self._load() + + def _load(self) -> None: + """Load patterns and instincts from disk.""" + # Load patterns + if self.patterns_file.exists(): + try: + data = json.loads(self.patterns_file.read_text()) + self._patterns = { + k: PatternStats.from_dict(v) for k, v in data.items() + } + logger.info(f"Loaded {len(self._patterns)} patterns") + except Exception as e: + logger.warning(f"Failed to load patterns: {e}") + + # Load instincts + if self.instincts_file.exists(): + try: + data = json.loads(self.instincts_file.read_text()) + self._instincts = { + k: InstinctProfile(**v) for k, v in data.items() + } + logger.info(f"Loaded {len(self._instincts)} instincts") + except Exception as e: + logger.warning(f"Failed to load instincts: {e}") + + def _save(self) -> None: + """Save patterns and instincts to disk.""" + try: + # Save patterns + patterns_data = {k: v.to_dict() for k, v in self._patterns.items()} + self.patterns_file.write_text(json.dumps(patterns_data, indent=2)) + + # Save instincts + instincts_data = {k: v.to_dict() for k, v in self._instincts.items()} + self.instincts_file.write_text(json.dumps(instincts_data, indent=2)) + + logger.debug(f"Saved {len(self._patterns)} patterns, {len(self._instincts)} instincts") + except Exception as e: + logger.error(f"Failed to save instinct data: {e}") + + def record_interaction( + self, + context: str, + tools: List[str], + skills: List[str], + success: bool = True, + feedback: Optional[str] = None, + ) -> None: + """ + Record a successful interaction. + + Args: + context: The conversation context + tools: Tools that were used + skills: Skills that were loaded + success: Whether the interaction was successful + feedback: Optional user feedback + """ + import re + + # Create interaction record + record = InteractionRecord.create(context, tools, skills, feedback) + + # Append to interactions log + with open(self.interactions_file, "a") as f: + f.write(json.dumps(asdict(record)) + "\n") + + # Extract context hints (keywords) + hints = self._extract_hints(context) + + # Update patterns for each hint + for hint in hints: + if hint not in self._patterns: + self._patterns[hint] = PatternStats() + + stats = self._patterns[hint] + stats.count += 1 + if success: + stats.total_success += 1 + + # Update running averages + stats.avg_tools = (stats.avg_tools * (stats.count - 1) + len(tools)) / stats.count + stats.avg_skills = (stats.avg_skills * (stats.count - 1) + len(skills)) / stats.count + stats.last_used = record.timestamp + + # Update co-occurrence counts + for tool in tools: + stats.tool_counts[tool] = stats.tool_counts.get(tool, 0) + 1 + for skill in skills: + stats.skill_counts[skill] = stats.skill_counts.get(skill, 0) + 1 + + # Update hint index + for hint in hints: + if hint not in self._context_hints: + self._context_hints[hint] = [] + if record.context_hash not in self._context_hints[hint]: + self._context_hints[hint].append(record.context_hash) + + # Rebuild instincts for affected patterns + self._rebuild_instincts(hints) + + # Save periodically (every 10 interactions) + if sum(p.count for p in self._patterns.values()) % 10 == 0: + self._save() + + def _extract_hints(self, context: str) -> List[str]: + """Extract meaningful hints from context.""" + import re + + context_lower = context.lower() + + # Extract words (3+ chars) + words = re.findall(r'\b\w{3,}\b', context_lower) + + # Filter out common stop words + stop_words = { + 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', + 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', + 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', + 'who', 'boy', 'did', 'she', 'use', 'way', 'will', 'with', 'have', + 'this', 'that', 'from', 'they', 'what', 'were', 'been', 'when', + 'would', 'there', 'could', 'their', 'which', 'about', 'after', + 'before', 'between', 'into', 'over', 'under', 'again', 'more', + 'some', 'such', 'only', 'just', 'also', 'very', 'than', 'then', + 'them', 'these', 'those', 'here', 'where', 'why', 'how', 'your', + } + + hints = [w for w in words if w not in stop_words] + + # Add bigrams for common patterns + bigrams = [] + for i in range(len(words) - 1): + bigram = f"{words[i]}_{words[i+1]}" + bigrams.append(bigram) + + hints.extend(bigrams) + + return hints[:20] # Limit to top 20 hints + + def _rebuild_instincts(self, hints: List[str]) -> None: + """ + Rebuild instincts for the given hints. + + An instinct is a prediction for what tools/skills to load + based on learned patterns. + """ + for hint in hints: + stats = self._patterns.get(hint) + if not stats or stats.count < 2: + continue + + # Calculate confidence based on sample size + confidence = min(1.0, stats.count / 10) # Max confidence at 10 samples + + # Get top tools (appearing in >50% of interactions) + threshold = stats.count * 0.5 + top_tools = [t for t, c in stats.tool_counts.items() if c >= threshold] + + # Get top skills + top_skills = [s for s, c in stats.skill_counts.items() if c >= threshold] + + if top_tools or top_skills: + self._instincts[hint] = InstinctProfile( + pattern=hint, + tools=top_tools[:5], # Max 5 tools + skills=top_skills[:5], # Max 5 skills + confidence=confidence, + sample_size=stats.count, + last_used=stats.last_used, + ) + + def predict_for_context(self, context: str, limit: int = 10) -> List[str]: + """ + Predict what to load for a context. + + Args: + context: The conversation context + limit: Maximum number of predictions + + Returns: + List of tool/skill paths to preload + """ + hints = self._extract_hints(context) + + # Aggregate predictions from matching hints + tool_scores: Dict[str, float] = defaultdict(float) + skill_scores: Dict[str, float] = defaultdict(float) + + for hint in hints: + instinct = self._instincts.get(hint) + if not instinct: + continue + + # Weight by confidence + weight = instinct.confidence + + for tool in instinct.tools: + tool_scores[tool] += weight + for skill in instinct.skills: + skill_scores[skill] += weight + + # Combine and sort + predictions = [] + for tool, score in sorted(tool_scores.items(), key=lambda x: -x[1])[:limit]: + predictions.append(tool) + for skill, score in sorted(skill_scores.items(), key=lambda x: -x[1])[:limit]: + if skill not in predictions: + predictions.append(skill) + + return predictions[:limit] + + def get_instinct_for_hint(self, hint: str) -> Optional[InstinctProfile]: + """Get the instinct for a specific hint.""" + return self._instincts.get(hint) + + def get_stats(self) -> Dict[str, Any]: + """Get learning statistics.""" + total_interactions = sum(p.count for p in self._patterns.values()) + return { + "total_patterns": len(self._patterns), + "total_instincts": len(self._instincts), + "total_interactions": total_interactions, + "avg_tools_per_interaction": sum(p.avg_tools for p in self._patterns.values()) / max(1, len(self._patterns)), + "avg_skills_per_interaction": sum(p.avg_skills for p in self._patterns.values()) / max(1, len(self._patterns)), + } + + def clear_old_data(self, days: int = 30) -> int: + """ + Clear pattern data older than specified days. + + Args: + days: Number of days to retain + + Returns: + Number of patterns removed + """ + import time + cutoff = time.time() - (days * 86400) + + removed = 0 + for pattern in list(self._patterns.keys()): + if self._patterns[pattern].last_used < cutoff: + del self._patterns[pattern] + if pattern in self._instincts: + del self._instincts[pattern] + removed += 1 + + if removed > 0: + self._save() + + return removed + + def export_instincts(self) -> Dict[str, Any]: + """Export all instincts for sharing/backup.""" + return { + "version": "1.0.0", + "exported_at": time.time(), + "instincts": {k: v.to_dict() for k, v in self._instincts.items()}, + "patterns": {k: v.to_dict() for k, v in self._patterns.items()}, + } + + def import_instincts(self, data: Dict) -> int: + """ + Import instincts from backup. + + Args: + data: Exported instinct data + + Returns: + Number of instincts imported + """ + count = 0 + + for k, v in data.get("instincts", {}).items(): + try: + self._instincts[k] = InstinctProfile(**v) + count += 1 + except Exception as e: + logger.warning(f"Failed to import instinct {k}: {e}") + + for k, v in data.get("patterns", {}).items(): + try: + self._patterns[k] = PatternStats.from_dict(v) + except Exception as e: + logger.warning(f"Failed to import pattern {k}: {e}") + + if count > 0: + self._save() + + return count + + +# ============================================================================= +# Singleton +# ============================================================================= + +_instinct_learner: Optional[InstinctLearner] = None + + +def get_instinct_learner() -> InstinctLearner: + """Get the singleton InstinctLearner instance.""" + global _instinct_learner + if _instinct_learner is None: + _instinct_learner = InstinctLearner() + return _instinct_learner + + +def record_success( + context: str, + tools: List[str], + skills: List[str], + feedback: Optional[str] = None, +) -> None: + """ + Convenience function to record a successful interaction. + + Args: + context: The conversation context + tools: Tools that were used + skills: Skills that were loaded + feedback: Optional user feedback + """ + get_instinct_learner().record_interaction(context, tools, skills, success=True, feedback=feedback) + + +def predict_loading(context: str, limit: int = 10) -> List[str]: + """ + Predict what to load for a context. + + Args: + context: The conversation context + limit: Maximum number of predictions + + Returns: + List of tool/skill paths to preload + """ + return get_instinct_learner().predict_for_context(context, limit) diff --git a/agent/lazy_loader.py b/agent/lazy_loader.py new file mode 100644 index 00000000..603a65eb --- /dev/null +++ b/agent/lazy_loader.py @@ -0,0 +1,648 @@ +""" +Context-Aware Lazy Tool Loader + +Implements tiered tool loading inspired by ECC's agent compression. +Only loads tools when needed based on conversation context. + +Architecture: +- TIER_0 (Tools): Native functions from registry (terminal, file ops, etc.) +- TIER_1 (Skills): High-level skills loaded on context triggers +- TIER_2 (Rare Tools): Tools rarely needed, loaded on demand + +Usage: + from agent.lazy_loader import LazyToolLoader, get_lazy_tool_loader + + loader = get_lazy_tool_loader() + tools = loader.get_tools_for_context("I need to debug my Python code") + + # Or use the helper function + from agent.lazy_loader import get_tools_budget_aware + tools = get_tools_budget_aware(context, context_usage=0.5) +""" + +import re +import time +import logging +from typing import Dict, List, Optional, Set, Any, Callable, Tuple +from functools import lru_cache +from collections import defaultdict + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Tool Tiers (Native tools from registry) +# ============================================================================= + +# Core tools that are always loaded (Tier 0) +TIER_0_TOOLS = frozenset({ + "terminal", + "read_file", + "write_file", + "patch", + "search_files", + "skills_list", + "skill_view", + "skill_manage", + "todo", + "memory", + "session_search", + "clarify", +}) + +# Tools that are pre-loaded based on context (Tier 1) +# These are actual TOOL NAMES from the registry +TIER_1_BY_CONTEXT = { + "code:python": ["terminal", "read_file"], # Basic file ops for Python + "code:go": ["terminal", "read_file"], + "code:rust": ["terminal", "read_file"], + "code:js": ["terminal", "read_file"], + "debug": ["terminal", "read_file", "patch"], + "test": ["terminal"], + "ship": ["terminal"], + "deploy": ["terminal"], + "plan": ["skills_list", "skill_view"], + "brainstorm": ["skills_list", "skill_view"], + "web": ["web_search", "web_extract", "browser_navigate"], + "browser": ["browser_navigate", "browser_snapshot", "browser_click", "browser_vision"], + "search": ["web_search", "search_files"], + "ml": ["terminal"], + "ai": ["terminal"], + "database": ["terminal"], + "docker": ["terminal"], + "git": ["terminal"], +} + +# Tools that are always Tier 2 (loaded on demand only) +TIER_2_TOOLS = frozenset({ + "cronjob", + "text_to_speech", + "delegate_task", + "mixture_of_agents", + "execute_code", + "image_generate", + "vision_analyze", + "process", + "send_message", + "rl_start_training", + "rl_stop_training", + "rl_list_runs", + "rl_list_environments", +}) + + +# ============================================================================= +# Skill Tiers (Context-triggered skills) +# ============================================================================= + +# Skills to load on context triggers (these are NOT tools, they're skill paths) +CONTEXT_TO_SKILLS = { + "code:python": [ + "software-development/python-reviewer", + "software-development/build-error-resolver", + ], + "code:go": [ + "software-development/go-reviewer", + ], + "code:rust": [ + "software-development/rust-reviewer", + ], + "code:js": [ + "software-development/javascript-reviewer", + ], + "code:java": [ + "software-development/java-reviewer", + ], + "code:cpp": [ + "software-development/cpp-reviewer", + ], + "code:csharp": [ + "software-development/csharp-reviewer", + ], + "code:php": [ + "software-development/php-reviewer", + ], + "debug": [ + "software-development/systematic-debugging", + "software-development/build-error-resolver", + ], + "review": [ + "software-development/two-stage-review", + "software-development/python-reviewer", + ], + "test": [ + "software-development/test-driven-development", + ], + "ship": [ + "software-development/idempotent-ship", + ], + "deploy": [ + "software-development/idempotent-ship", + ], + "plan": [ + "software-development/writing-plans", + "software-development/brainstorming", + ], + "feature": [ + "software-development/writing-plans", + "software-development/brainstorming", + ], + "brainstorm": [ + "software-development/brainstorming", + ], + "install": [ + "software-development/hooks", + ], + "hook": [ + "software-development/hooks", + ], + "ml": [ + "research/llm-wiki", + "mlops/huggingface-hub", + ], + "ai": [ + "research/llm-wiki", + "mlops/huggingface-hub", + ], + "model": [ + "research/llm-wiki", + "mlops/huggingface-hub", + ], + "refactor": [ + "software-development/python-reviewer", + "software-development/two-stage-review", + ], + "performance": [ + "software-development/python-reviewer", + "software-development/systematic-debugging", + ], + "security": [ + "software-development/two-stage-review", + ], +} + + +# ============================================================================= +# Code Extensions & Keywords +# ============================================================================= + +CODE_EXTENSIONS = { + "py": "python", + "js": "javascript", + "ts": "typescript", + "go": "golang", + "rs": "rust", + "java": "java", + "cpp": "cpp", + "c": "c", + "rb": "ruby", + "php": "php", + "sh": "shell", + "bash": "shell", + "zsh": "shell", + "yaml": "yaml", + "yml": "yaml", + "json": "json", + "toml": "toml", + "md": "markdown", + "sql": "sql", + "html": "html", + "css": "css", + "cs": "csharp", +} + +DOMAIN_KEYWORDS = { + "review", "debug", "test", "ship", "deploy", "build", "compile", + "install", "setup", "config", "api", "web", "browser", "search", + "database", "docker", "kubernetes", "k8s", "cloud", "aws", "gcp", + "azure", "linux", "windows", "macos", "android", "ios", + "python", "javascript", "typescript", "go", "rust", "java", + "ml", "ai", "machine learning", "deep learning", "nlp", + "git", "github", "gitlab", "ci", "cd", "pipeline", + "refactor", "performance", "security", + "write", "create", "implement", "feature", "feature request", +} + + +# ============================================================================= +# LazyToolLoader +# ============================================================================= + +class LazyToolLoader: + """ + Context-aware tool loader with tiering and caching. + + TIER_0: Always loaded (core tools from registry) + TIER_1: Context-preloaded (tools based on detected patterns) + TIER_2: On-demand loaded (rare tools) + + Plus: Skill preloading based on context + """ + + def __init__(self): + self._tier_0_cache: Optional[List[dict]] = None + self._tier_1_cache: Dict[str, List[dict]] = {} + self._tier_2_cache: Dict[str, dict] = {} + self._context_hints: Set[str] = set() + self._active_skills: Set[str] = set() + self._skill_content_cache: Dict[str, str] = {} + self._last_context_analysis: float = 0 + self._cache_ttl: float = 60.0 + self._tools_discovered: bool = False + + def get_tools_for_context( + self, + context: str, + enabled_toolsets: Optional[List[str]] = None, + context_usage: float = 0.0, + ) -> List[dict]: + """ + Get tool definitions optimized for the current context. + + Args: + context: Conversation context (recent messages) + enabled_toolsets: Available toolsets to filter from + context_usage: Current context window usage (0.0-1.0) + + Returns: + List of OpenAI-format tool definitions + """ + self._ensure_tools_discovered() + + # Check if we need to re-analyze context + current_time = time.time() + if current_time - self._last_context_analysis > self._cache_ttl: + self._context_hints = self._analyze_context(context) + self._last_context_analysis = current_time + + tools = [] + seen = set() + + # Tier 0: Always load core tools + if self._tier_0_cache is None: + self._tier_0_cache = self._load_tier_0_tools() + tools.extend(self._tier_0_cache) + for t in self._tier_0_cache: + seen.add(t["function"]["name"]) + + # Budget-aware Tier 1 loading + if context_usage < 0.8: + tier_1_tools = self._get_tier_1_tools() + for t in tier_1_tools: + if t["function"]["name"] not in seen: + tools.append(t) + seen.add(t["function"]["name"]) + + # Budget-aware Tier 2 exclusion + # (Tier 2 tools are never preloaded, only loaded on-demand) + + return tools + + def get_active_skills(self, context: str, context_usage: float = 0.0) -> List[str]: + """ + Get list of skill paths to load for the current context. + + Args: + context: Conversation context + context_usage: Current context usage (0.0-1.0) + + Returns: + List of skill paths to load + """ + current_time = time.time() + if current_time - self._last_context_analysis > self._cache_ttl: + self._context_hints = self._analyze_context(context) + self._last_context_analysis = current_time + + skills = [] + seen = set() + + # Load skills based on context hints + for hint in self._context_hints: + if hint in CONTEXT_TO_SKILLS: + for skill_path in CONTEXT_TO_SKILLS[hint]: + if skill_path not in seen: + # Only load if budget allows + if context_usage < 0.85: + skills.append(skill_path) + seen.add(skill_path) + + self._active_skills = seen + return skills + + def get_skill_content(self, skill_path: str) -> str: + """ + Get skill content with caching. + + Args: + skill_path: Path relative to skills dir (e.g., 'software-development/python-reviewer') + + Returns: + Skill content or empty string if not found + """ + if skill_path not in self._skill_content_cache: + try: + from pathlib import Path + from hermes_constants import get_skills_dir + skill_file = get_skills_dir() / skill_path / "SKILL.md" + if skill_file.exists(): + self._skill_content_cache[skill_path] = skill_file.read_text() + else: + self._skill_content_cache[skill_path] = "" + except Exception as e: + logger.debug(f"Failed to load skill {skill_path}: {e}") + self._skill_content_cache[skill_path] = "" + return self._skill_content_cache[skill_path] + + def preload_skills_for_context( + self, + context: str, + context_usage: float = 0.0, + max_tokens: int = 8000 + ) -> str: + """ + Preload skill content for the current context. + + Returns combined skill content as a string for injection into context. + + Args: + context: Conversation context + context_usage: Current context usage + max_tokens: Maximum tokens to use for skill content + + Returns: + Combined skill content string + """ + skills = self.get_active_skills(context, context_usage) + + combined = [] + total_chars = 0 + + for skill_path in skills: + content = self.get_skill_content(skill_path) + if content: + # Rough token estimate: 4 chars per token + estimated_tokens = len(content) // 4 + if total_chars + estimated_tokens < max_tokens * 4: + combined.append(f"\n\n=== Skill: {skill_path} ===\n{content}") + total_chars += estimated_tokens + + return "\n".join(combined) + + def get_tools_from_existing( + self, + all_tools: List[dict], + context: str, + context_usage: float = 0.0, + ) -> List[dict]: + """ + Filter an existing tool list based on context. + + Args: + all_tools: Full list of tool definitions + context: Conversation context + context_usage: 0.0-1.0 context window usage + + Returns: + Filtered tool list optimized for context + """ + if context_usage > 0.9: + # Emergency mode - only tier 0 + tier_0_names = TIER_0_TOOLS + return [t for t in all_tools if t["function"]["name"] in tier_0_names] + + if context_usage > 0.8: + # High usage - tier 0 + critical tier 1 + tier_0_names = TIER_0_TOOLS + critical_tools = {"terminal", "read_file", "patch", "search_files"} + allowed = tier_0_names | critical_tools + return [t for t in all_tools if t["function"]["name"] in allowed] + + # Normal mode - use context analysis + hints = self._analyze_context(context) + allowed_tools = set(TIER_0_TOOLS) + + for hint in hints: + if hint in TIER_1_BY_CONTEXT: + allowed_tools.update(TIER_1_BY_CONTEXT[hint]) + + # Filter and rank tools + result = [] + for tool in all_tools: + name = tool["function"]["name"] + if name in allowed_tools: + result.append(tool) + + return result + + def load_tier_2_tool(self, tool_name: str) -> Optional[dict]: + """ + On-demand load a Tier 2 tool. + """ + if tool_name not in self._tier_2_cache: + self._ensure_tools_discovered() + self._tier_2_cache[tool_name] = self._load_single_tool(tool_name) + return self._tier_2_cache.get(tool_name) + + def preload_for_tool(self, tool_name: str) -> None: + """ + Preload a tool's schema into cache. + """ + if tool_name in TIER_2_TOOLS: + self.load_tier_2_tool(tool_name) + + def _ensure_tools_discovered(self) -> None: + """Ensure all tools have been discovered and registered.""" + if not self._tools_discovered: + try: + import model_tools + self._tools_discovered = True + except Exception: + pass + + def _analyze_context(self, context: str) -> Set[str]: + """Analyze context text and extract hints.""" + hints = set() + context_lower = context.lower() + + # Extract code extensions + extensions = re.findall(r'\.(\w+)', context_lower) + for ext in extensions: + if ext in CODE_EXTENSIONS: + hints.add(f"code:{CODE_EXTENSIONS[ext]}") + + # Check domain keywords + for keyword in DOMAIN_KEYWORDS: + if keyword in context_lower: + hints.add(keyword) + + # Check for specific phrases + tool_mentions = { + "web search": "web", + "search the web": "web", + "golang": "code:go", + "go language": "code:go", + "go code": "code:go", + " in go ": "code:go", + "write go ": "code:go", + "python code": "code:python", + "python script": "code:python", + " in python": "code:python", + "write python": "code:python", + "rust code": "code:rust", + "rust program": "code:rust", + " in rust": "code:rust", + "write rust": "code:rust", + "javascript": "code:js", + "typescript": "code:js", + "c++": "code:cpp", + "cron": "database", # Schedule jobs via terminal + "schedule": "database", + "voice": "ai", + "speak": "ai", + "delegate": "ai", + "subagent": "ai", + } + + for phrase, hint in tool_mentions.items(): + if phrase in context_lower: + hints.add(hint) + + return hints + + def _get_tier_1_tools(self) -> List[dict]: + """Get Tier 1 tools based on context analysis.""" + tools = [] + seen = set() + + for hint in self._context_hints: + if hint in TIER_1_BY_CONTEXT: + for tool_name in TIER_1_BY_CONTEXT[hint]: + if tool_name not in seen: + seen.add(tool_name) + tool_def = self._load_single_tool(tool_name) + if tool_def: + tools.append(tool_def) + + return tools + + def _load_tier_0_tools(self) -> List[dict]: + """Load Tier 0 core tools.""" + tools = [] + for tool_name in TIER_0_TOOLS: + tool_def = self._load_single_tool(tool_name) + if tool_def: + tools.append(tool_def) + return tools + + def _load_single_tool(self, tool_name: str) -> Optional[dict]: + """Load a single tool's schema from registry.""" + try: + from tools.registry import registry + + entry = registry._tools.get(tool_name) + if not entry: + return None + + # Check if tool is available + if entry.check_fn: + try: + if not entry.check_fn(): + return None + except Exception: + return None + + schema = entry.schema.copy() + schema["name"] = entry.name + return {"type": "function", "function": schema} + + except Exception as e: + logger.debug(f"Failed to load tool {tool_name}: {e}") + return None + + +# ============================================================================= +# Integration helpers +# ============================================================================= + +def filter_tools_with_context( + all_tools: List[dict], + context: str, + context_usage: float = 0.0, +) -> List[dict]: + """ + Filter a pre-loaded tool list based on context. + + Usage: + from model_tools import get_tool_definitions + from agent.lazy_loader import filter_tools_with_context + + all_tools = get_tool_definitions(enabled_toolsets=["terminal"]) + filtered_tools = filter_tools_with_context( + all_tools, + context=conversation_context, + context_usage=0.5 + ) + """ + loader = get_lazy_tool_loader() + return loader.get_tools_from_existing(all_tools, context, context_usage) + + +def get_skills_for_context( + context: str, + context_usage: float = 0.0, +) -> List[str]: + """ + Get skill paths to load for the current context. + + Usage: + from agent.lazy_loader import get_skills_for_context + + skill_paths = get_skills_for_context("I need to review Python code") + for path in skill_paths: + content = loader.get_skill_content(path) + # Inject into context... + """ + loader = get_lazy_tool_loader() + return loader.get_active_skills(context, context_usage) + + +# ============================================================================= +# Singleton instance +# ============================================================================= + +_lazy_tool_loader: Optional[LazyToolLoader] = None + + +def get_lazy_tool_loader() -> LazyToolLoader: + """Get the singleton LazyToolLoader instance.""" + global _lazy_tool_loader + if _lazy_tool_loader is None: + _lazy_tool_loader = LazyToolLoader() + return _lazy_tool_loader + + +# ============================================================================= +# Helper functions +# ============================================================================= + +def get_tools_budget_aware( + context: str, + context_usage: float, + enabled_toolsets: Optional[List[str]] = None, +) -> List[dict]: + """ + Convenience function for budget-aware tool loading. + """ + loader = get_lazy_tool_loader() + return loader.get_tools_for_context( + context=context, + enabled_toolsets=enabled_toolsets, + context_usage=context_usage, + ) + + +def preload_tools_for_hint(hint: str) -> None: + """Preload tools based on a context hint.""" + loader = get_lazy_tool_loader() + loader._context_hints.add(hint) diff --git a/agent/manifest.py b/agent/manifest.py new file mode 100644 index 00000000..57a78267 --- /dev/null +++ b/agent/manifest.py @@ -0,0 +1,548 @@ +""" +Manifest Profile System + +Defines named agent profiles with specific skill sets. +Users or context can select which profile to use, +allowing selective skill loading instead of loading everything. + +Inspired by ECC's agent manifest system. + +Usage: + from agent.manifest import ManifestLoader, get_active_profile, switch_profile + + # Get current profile + profile = get_active_profile() + + # Switch to a different profile + switch_profile("coding") + + # Load skills for current profile + loader = ManifestLoader() + skills = loader.get_skills_for_profile("debugging") +""" + +import yaml +import logging +from pathlib import Path +from typing import Dict, List, Optional, Set, Any, Callable +from dataclasses import dataclass, field +from functools import lru_cache + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Data Structures +# ============================================================================= + +@dataclass +class SkillRef: + """Reference to a skill with optional parameters.""" + path: str + enabled: bool = True + priority: int = 0 # Lower = higher priority + description: Optional[str] = None + + +@dataclass +class Profile: + """An agent profile with associated skills.""" + name: str + description: str + skills: List[SkillRef] + tags: List[str] = field(default_factory=list) + auto_activate: bool = False # Auto-activate based on context hints + + @property + def skill_paths(self) -> List[str]: + """Get list of skill paths for this profile.""" + return [s.path for s in self.skills if s.enabled] + + +@dataclass +class Manifest: + """Complete manifest with all profiles.""" + version: str + profiles: Dict[str, Profile] + default_profile: str + metadata: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: Dict) -> "Manifest": + """Create Manifest from dictionary.""" + profiles = {} + for name, profile_data in data.get("profiles", {}).items(): + skills = [] + for skill_data in profile_data.get("skills", []): + if isinstance(skill_data, str): + # Simple skill reference + skills.append(SkillRef(path=skill_data)) + elif isinstance(skill_data, dict): + # Skill with extra config + skills.append(SkillRef( + path=skill_data.get("path", skill_data.get("name", "")), + enabled=skill_data.get("enabled", True), + priority=skill_data.get("priority", 0), + description=skill_data.get("description"), + )) + profiles[name] = Profile( + name=profile_data.get("name", name), + description=profile_data.get("description", ""), + skills=skills, + tags=profile_data.get("tags", []), + auto_activate=profile_data.get("auto_activate", False), + ) + + return cls( + version=data.get("version", "1.0.0"), + profiles=profiles, + default_profile=data.get("default_profile", "general"), + metadata=data.get("metadata", {}), + ) + + +# ============================================================================= +# Built-in Default Manifest +# ============================================================================= + +DEFAULT_MANIFEST_YAML = """ +# Hermes Agent Manifest +# Defines agent profiles with specific skill sets +# Profiles allow selective skill loading based on context + +version: "1.0.0" +default_profile: coding + +profiles: + coding: + name: "Coding Agent" + description: "For code review, refactoring, and implementation" + tags: [code, review, refactor] + auto_activate: true + skills: + - path: software-development/python-reviewer + priority: 10 + - path: software-development/go-reviewer + priority: 20 + - path: software-development/rust-reviewer + priority: 20 + - path: software-development/javascript-reviewer + priority: 30 + - path: software-development/two-stage-review + priority: 5 + - path: software-development/build-error-resolver + priority: 15 + + debugging: + name: "Debug Agent" + description: "For systematic debugging and problem solving" + tags: [debug, problem-solving] + auto_activate: true + skills: + - path: software-development/systematic-debugging + priority: 5 + - path: software-development/build-error-resolver + priority: 10 + + shipping: + name: "Ship Agent" + description: "For deployment, CI/CD, and releasing" + tags: [deploy, ship, ci-cd] + auto_activate: true + skills: + - path: software-development/idempotent-ship + priority: 5 + - path: software-development/hooks + priority: 10 + + planning: + name: "Planning Agent" + description: "For brainstorming and planning" + tags: [plan, brainstorm] + auto_activate: true + skills: + - path: software-development/brainstorming + priority: 5 + - path: software-development/writing-plans + priority: 10 + + research: + name: "Research Agent" + description: "For ML/AI research and exploration" + tags: [research, ml, ai] + auto_activate: true + skills: + - path: research/llm-wiki + priority: 5 + - path: mlops/huggingface-hub + priority: 10 + - path: mlops/weights-and-biases + priority: 20 + + general: + name: "General Agent" + description: "General purpose assistant" + tags: [general] + skills: [] +""" + + +# ============================================================================= +# Manifest Loader +# ============================================================================= + +class ManifestLoader: + """ + Loads and manages agent manifest profiles. + + The manifest defines named profiles with specific skill sets. + Skills are loaded selectively based on the active profile. + """ + + def __init__(self, manifest_path: Optional[Path] = None): + """ + Initialize manifest loader. + + Args: + manifest_path: Path to manifest YAML file. + Defaults to ~/.hermes/manifest.yaml + """ + if manifest_path is None: + manifest_path = get_hermes_home() / "manifest.yaml" + + self.manifest_path = manifest_path + self._manifest: Optional[Manifest] = None + self._active_profile_name: Optional[str] = None + self._profile_hints: Set[str] = set() + + def load_manifest(self) -> Manifest: + """Load manifest from file or use default.""" + if self._manifest is not None: + return self._manifest + + # Try to load from file + if self.manifest_path.exists(): + try: + data = yaml.safe_load(self.manifest_path.read_text()) + self._manifest = Manifest.from_dict(data) + logger.info(f"Loaded manifest from {self.manifest_path}") + return self._manifest + except Exception as e: + logger.warning(f"Failed to load manifest: {e}") + + # Use default manifest + self._manifest = Manifest.from_dict(yaml.safe_load(DEFAULT_MANIFEST_YAML)) + logger.info("Using default manifest") + return self._manifest + + def get_profile(self, name: str) -> Optional[Profile]: + """Get a profile by name.""" + manifest = self.load_manifest() + return manifest.profiles.get(name) + + def get_active_profile(self) -> Profile: + """Get the currently active profile.""" + manifest = self.load_manifest() + + if self._active_profile_name and self._active_profile_name in manifest.profiles: + return manifest.profiles[self._active_profile_name] + + # Return default profile + return manifest.profiles[manifest.default_profile] + + def switch_profile(self, name: str) -> bool: + """ + Switch to a different profile. + + Args: + name: Profile name to switch to + + Returns: + True if switch successful, False if profile doesn't exist + """ + manifest = self.load_manifest() + if name not in manifest.profiles: + logger.warning(f"Profile '{name}' not found") + return False + + old_profile = self._active_profile_name + self._active_profile_name = name + logger.info(f"Switched profile: {old_profile} → {name}") + return True + + def auto_select_profile(self, context: str) -> Profile: + """ + Automatically select profile based on context. + + Uses tag matching and hint detection to find the best profile. + Uses word boundary matching to avoid false positives. + Profile-specific tags (defined in profile) get higher weight. + + Args: + context: Conversation context + + Returns: + Selected profile + """ + import re + manifest = self.load_manifest() + context_lower = context.lower() + + best_profile = None + best_score = 0 + best_match_count = 0 + + # Pre-process context into words + context_words = set(re.findall(r'\b\w+\b', context_lower)) + + for name, profile in manifest.profiles.items(): + if not profile.auto_activate: + continue + + score = 0 + match_count = 0 + + # Check tags against context (word boundary matching) + # Primary tag (first in list) and early-position matches get higher weight + for i, tag in enumerate(profile.tags): + if not tag: + continue + + # Check if tag appears in context + if tag in context_words: + # Primary tag (first) gets higher base weight + base_weight = 20 if i == 0 else 8 + + # Find position of tag in original context for position scoring + tag_pos = context_lower.find(tag) + if tag_pos >= 0: + # Tags appearing in first 30% of context get bonus (main intent) + if tag_pos < len(context_lower) * 0.3: + score += base_weight + 5 + else: + score += base_weight + else: + score += base_weight + match_count += 1 + + # Fall back to substring match (lower priority) + elif tag in context_lower: + base_weight = 8 if i == 0 else 3 + score += base_weight + match_count += 1 + + # Profile name match is a strong signal (user explicitly mentioned this agent type) + profile_name_words = set(re.findall(r'\b\w+\b', profile.name.lower())) + for word in profile_name_words: + if word in context_words and len(word) > 3: # Skip short words + score += 20 # Strong signal + match_count += 2 + + # Check if profile name/description matches + if profile.name.lower() in context_lower: + score += 3 + if profile.description.lower() in context_lower: + score += 1 + + # Update best if this profile is better + # Tie-breaker: prefer profile with more tag matches + if score > best_score or (score == best_score and match_count > best_match_count): + best_score = score + best_match_count = match_count + best_profile = profile + + if best_profile and best_score > 0: + self._active_profile_name = best_profile.name + logger.info(f"Auto-selected profile: {best_profile.name} (score={best_score})") + return best_profile + + # Return default + return manifest.profiles[manifest.default_profile] + + def get_skills_for_profile(self, profile_name: str) -> List[str]: + """ + Get skill paths for a profile. + + Args: + profile_name: Name of the profile + + Returns: + List of skill paths + """ + profile = self.get_profile(profile_name) + if profile is None: + return [] + return profile.skill_paths + + def get_skills_for_active_profile(self) -> List[str]: + """Get skill paths for the active profile.""" + return self.get_active_profile().skill_paths + + def add_profile(self, profile: Profile) -> None: + """ + Add a new profile to the manifest. + + Args: + profile: Profile to add + """ + manifest = self.load_manifest() + manifest.profiles[profile.name] = profile + + def remove_profile(self, name: str) -> bool: + """ + Remove a profile from the manifest. + + Args: + name: Profile name to remove + + Returns: + True if removed, False if didn't exist + """ + manifest = self.load_manifest() + if name in manifest.profiles: + del manifest.profiles[name] + if self._active_profile_name == name: + self._active_profile_name = manifest.default_profile + return True + return False + + def save_manifest(self) -> bool: + """ + Save current manifest to file. + + Returns: + True if saved successfully + """ + if self._manifest is None: + return False + + try: + # Convert manifest to dict + data = { + "version": self._manifest.version, + "default_profile": self._manifest.default_profile, + "metadata": self._manifest.metadata, + "profiles": {}, + } + + for name, profile in self._manifest.profiles.items(): + skills = [] + for skill in profile.skills: + skill_dict = {"path": skill.path} + if not skill.enabled: + skill_dict["enabled"] = False + if skill.priority != 0: + skill_dict["priority"] = skill.priority + if skill.description: + skill_dict["description"] = skill.description + skills.append(skill_dict) + + data["profiles"][name] = { + "name": profile.name, + "description": profile.description, + "tags": profile.tags, + "auto_activate": profile.auto_activate, + "skills": skills, + } + + # Ensure directory exists + self.manifest_path.parent.mkdir(parents=True, exist_ok=True) + + # Write file + self.manifest_path.write_text(yaml.dump(data, default_flow_style=False)) + logger.info(f"Saved manifest to {self.manifest_path}") + return True + + except Exception as e: + logger.error(f"Failed to save manifest: {e}") + return False + + +# ============================================================================= +# Integration with LazyToolLoader +# ============================================================================= + +def get_skills_for_current_profile( + context: Optional[str] = None, + manifest_path: Optional[Path] = None, +) -> List[str]: + """ + Get skill paths for the current profile. + + This integrates with the lazy loader to provide profile-based + skill selection. + + Args: + context: Optional context for auto-selection + manifest_path: Optional path to manifest file + + Returns: + List of skill paths + """ + loader = ManifestLoader(manifest_path) + + # Auto-select profile if context provided + if context: + loader.auto_select_profile(context) + + return loader.get_skills_for_active_profile() + + +def get_profile_skill_content( + profile_name: Optional[str] = None, + manifest_path: Optional[Path] = None, +) -> str: + """ + Get combined skill content for a profile. + + Args: + profile_name: Profile to get content for. If None, uses active. + manifest_path: Optional path to manifest file + + Returns: + Combined skill content string + """ + from agent.lazy_loader import get_lazy_tool_loader + + loader = ManifestLoader(manifest_path) + lazy = get_lazy_tool_loader() + + if profile_name: + skills = loader.get_skills_for_profile(profile_name) + else: + skills = loader.get_skills_for_active_profile() + + combined = [] + for skill_path in skills: + content = lazy.get_skill_content(skill_path) + if content: + combined.append(f"\n\n=== Skill: {skill_path} ===\n{content}") + + return "\n".join(combined) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_manifest_loader: Optional[ManifestLoader] = None + + +def get_manifest_loader() -> ManifestLoader: + """Get the singleton ManifestLoader instance.""" + global _manifest_loader + if _manifest_loader is None: + _manifest_loader = ManifestLoader() + return _manifest_loader + + +def get_active_profile() -> Profile: + """Get the active profile.""" + return get_manifest_loader().get_active_profile() + + +def switch_profile(name: str) -> bool: + """Switch to a different profile.""" + return get_manifest_loader().switch_profile(name) diff --git a/hermes-gateway.service b/hermes-gateway.service new file mode 100644 index 00000000..fe480c8b --- /dev/null +++ b/hermes-gateway.service @@ -0,0 +1,16 @@ +[Unit] +Description=Hermes Gateway - AI Agent Messaging Platform +After=network.target + +[Service] +Type=simple +User=long +WorkingDirectory=/home/long/hermes-agent +Environment="PATH=/home/long/hermes-agent/venv/bin:/usr/local/bin:/usr/bin:/bin" +Environment="HERMES_HOME=/home/long/.hermes" +ExecStart=/home/long/hermes-agent/venv/bin/python /home/long/hermes-agent/gateway/run.py +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/model_tools.py b/model_tools.py index 1924b251..d18752cf 100644 --- a/model_tools.py +++ b/model_tools.py @@ -158,6 +158,8 @@ def _discover_tools(): "tools.send_message_tool", # "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin "tools.homeassistant_tool", + "tools.hook_tool", + "tools.reviewer_tool", ] import importlib for mod_name in _modules: diff --git a/tools/file_tools.py b/tools/file_tools.py index ca2118c3..bb26c26e 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -547,6 +547,23 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str: stale_warning = _check_file_staleness(path, task_id) file_ops = _get_file_ops(task_id) result = file_ops.write_file(path, content) + + # Run post-write hooks on successful write + if result.error is None: + try: + from tools.hooks import run_hooks, EVENT_FILE_WRITE + hook_results = run_hooks(EVENT_FILE_WRITE, file_path=path) + blocked = [r for r in hook_results if r.blocked] + if blocked: + # Hook blocked - pre-write or post-write hook failed with block_on_failure + return json.dumps({ + "error": f"Hook blocked: {blocked[0].message}", + "_hooks_blocked": True, + "_hook_name": blocked[0].hook_name, + }, ensure_ascii=False) + except Exception as hook_err: + logger.warning("Hook execution failed: %s", hook_err) + result_dict = result.to_dict() if stale_warning: result_dict["_warning"] = stale_warning diff --git a/tools/hook_tool.py b/tools/hook_tool.py new file mode 100644 index 00000000..06655008 --- /dev/null +++ b/tools/hook_tool.py @@ -0,0 +1,150 @@ +"""Hook management tools for Hermes. + +Provides tools for listing, creating, deleting, enabling, and disabling hooks. +Hooks are YAML files in ~/.hermes/hooks/ that run actions on file/command events. +""" + +import json +import os + +from tools.registry import registry + +# Lazy import to avoid circular imports +def _get_hooks_module(): + from tools import hooks as _hooks + return _hooks + + +def _hooks_list(action: str = "list", hook_path: str = None) -> str: + """Tool wrapper for hook operations.""" + hooks = _get_hooks_module() + + if action == "list": + result = hooks.list_hooks() + if not result: + return json.dumps({"hooks": {}, "message": "No hooks configured"}) + return json.dumps({"hooks": result}, ensure_ascii=False) + + elif action == "create": + if not hook_path: + return json.dumps({"error": "hook_path required for create"}) + parts = hook_path.split("/", 1) + if len(parts) != 2: + return json.dumps({"error": "hook_path must be in format '/' (e.g. 'post-write/pytest-on-py')"}) + subdir, name = parts + if subdir not in ("pre-write", "post-write", "pre-command", "post-command"): + return json.dumps({"error": f"Invalid subdir '{subdir}'. Must be one of: pre-write, post-write, pre-command, post-command"}) + success, msg = hooks.create_hook( + name=name, + subdir=subdir, + trigger_event="file_write", + pattern="*.py", + actions=[{"type": "command", "command": "pytest {file}", "timeout": 60, "block_on_failure": True}], + description=f"Auto-created hook: {name}", + ) + return json.dumps({"success": success, "message": msg}, ensure_ascii=False) + + elif action == "delete": + if not hook_path: + return json.dumps({"error": "hook_path required for delete"}) + success, msg = hooks.delete_hook(hook_path) + return json.dumps({"success": success, "message": msg}, ensure_ascii=False) + + elif action == "enable": + if not hook_path: + return json.dumps({"error": "hook_path required for enable"}) + success, msg = hooks.enable_hook(hook_path) + return json.dumps({"success": success, "message": msg}, ensure_ascii=False) + + elif action == "disable": + if not hook_path: + return json.dumps({"error": "hook_path required for disable"}) + success, msg = hooks.disable_hook(hook_path) + return json.dumps({"success": success, "message": msg}, ensure_ascii=False) + + elif action == "dry_run": + if not hook_path: + return json.dumps({"error": "hook_path required for dry_run"}) + # Dry run a specific hook + hooks_dir = hooks._get_hooks_dir() + full_path = hooks_dir / hook_path + if not full_path.exists(): + return json.dumps({"error": f"Hook not found: {hook_path}"}) + hook = hooks._load_hook_file(full_path) + if not hook: + return json.dumps({"error": f"Failed to load hook: {hook_path}"}) + # Simulate what would happen + return json.dumps({ + "hook": hook.name, + "description": hook.description, + "trigger_event": hook.trigger_event, + "trigger_pattern": hook.trigger_pattern, + "actions": [{"type": a.type, "command": a.command} for a in hook.actions], + "dry_run": True, + "note": "Commands would be shown but not executed", + }, ensure_ascii=False) + + else: + return json.dumps({"error": f"Unknown action: {action}"}) + + +# Schema for hook tool +_hooks_schema = { + "name": "hooks", + "description": """Manage Hermes hooks — trigger-based automation for file and command events. + +Hooks run actions (commands, notifications) when files are written or commands are executed. + +**Hook directories:** +- `pre-write/` — before file write (can block the write) +- `post-write/` — after file write +- `pre-command/` — before terminal command (can block the command) +- `post-command/` — after terminal command + +**Actions:** +- `command` — run a shell command +- `notification` — log or send notification +- `log` — log a message + +**Examples:** +- List all hooks: `{"action": "list"}` +- Create a Python test hook: `{"action": "create", "hook_path": "post-write/pytest-on-py"}` +- Delete a hook: `{"action": "delete", "hook_path": "post-write/pytest-on-py"}` +- Dry run a hook: `{"action": "dry_run", "hook_path": "post-write/pytest-on-py"}`""", + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["list", "create", "delete", "enable", "disable", "dry_run"], + "description": "Action to perform", + }, + "hook_path": { + "type": "string", + "description": "Hook path for create/delete/enable/disable/dry_run. Format: '/' e.g. 'post-write/pytest-on-py'", + }, + }, + "required": ["action"], + }, +} + + +def _check_hooks_available() -> bool: + """Check if hooks module is available.""" + return True + + +registry.register( + name="hooks", + toolset="file", + schema=_hooks_schema, + handler=lambda args, **kw: _hooks_list( + action=args.get("action", "list"), + hook_path=args.get("hook_path"), + ), + check_fn=_check_hooks_available, + requires_env=[], + is_async=False, + description="Manage Hermes hooks for file/command automation", + emoji="🪝", +) diff --git a/tools/hooks.py b/tools/hooks.py new file mode 100644 index 00000000..8015b206 --- /dev/null +++ b/tools/hooks.py @@ -0,0 +1,578 @@ +#!/usr/bin/env python3 +""" +Hermes Hook System - Trigger-based automation for file and command events. + +Hooks are YAML config files in ~/.hermes/hooks/ that run actions when +specific events occur (file writes, terminal commands, etc.). + +Directory Structure: + ~/.hermes/hooks/ + ├── pre-write/ # Before file write + │ └── .yaml + ├── post-write/ # After file write + │ └── .yaml + ├── pre-command/ # Before terminal command + │ └── .yaml + └── post-command/ # After terminal command + └── .yaml + +Hook Config Format: + name: pytest-on-py + description: Run pytest on Python files after write + enabled: true + + trigger: + event: file_write # file_write | file_patch | terminal_command + pattern: "*.py" # glob pattern + path: null # optional directory filter + + actions: + - type: command + command: pytest {file} + timeout: 60 + block_on_failure: true + + - type: notification + message: "Tests passed for {file}" + + conditions: + if_file_exists: true # only for pre-write (file already exists) + if_command_exists: [pytest, ruff] + +Exit Codes: + - Hook command exit 0 + block_on_failure=false → continue + - Hook command exit 0 + block_on_failure=true → continue + - Hook command exit != 0 + block_on_failure=true → BLOCK + - Hook command exit != 0 + block_on_failure=false → warn + continue +""" + +import fnmatch +import json +import logging +import os +import subprocess +import threading +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable, Optional + +import yaml + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + +# Hook event types +EVENT_FILE_WRITE = "file_write" +EVENT_FILE_PATCH = "file_patch" +EVENT_FILE_READ = "file_read" +EVENT_TERMINAL_COMMAND = "terminal_command" + +# Hook directories relative to HERMES_HOME +HOOK_SUBDIRS = ["pre-write", "post-write", "pre-command", "post-command"] + +# Cache for loaded hooks (thread-safe) +_hooks_cache: dict[str, list["Hook"]] = {} +_cache_lock = threading.RLock() +_cache_version: int = 0 + + +def _get_hooks_dir() -> Path: + """Return the hooks directory path.""" + return get_hermes_home() / "hooks" + + +@dataclass +class HookAction: + """A single action within a hook.""" + + type: str # "command", "notification", "log" + command: Optional[str] = None + message: Optional[str] = None + timeout: int = 30 + block_on_failure: bool = False + continue_on_error: bool = False + platform: Optional[str] = None # telegram, discord, etc. + + +@dataclass +class Hook: + """A loaded hook configuration.""" + + name: str + description: str + enabled: bool = True + trigger_event: Optional[str] = None + trigger_pattern: Optional[str] = None + trigger_path: Optional[str] = None + actions: list[HookAction] = field(default_factory=list) + conditions: dict[str, Any] = field(default_factory=dict) + source_file: Optional[Path] = None + + def matches(self, event: str, file_path: Optional[str] = None) -> bool: + """Check if this hook matches the given event and file.""" + if not self.enabled: + return False + if self.trigger_event and self.trigger_event != event: + return False + if self.trigger_pattern and file_path: + if not fnmatch.fnmatch(Path(file_path).name, self.trigger_pattern): + return False + if self.trigger_path and file_path: + if not Path(file_path).as_posix().startswith(self.trigger_path): + return False + return True + + +@dataclass +class HookResult: + """Result of running a hook.""" + + hook_name: str + success: bool + blocked: bool = False + message: str = "" + details: str = "" + + +def load_hooks(event: str) -> list[Hook]: + """Load all enabled hooks matching the given event type.""" + global _cache_version + + cache_key = event + with _cache_lock: + # Check if we have a valid cache + if cache_key in _hooks_cache: + return _hooks_cache[cache_key] + + hooks_dir = _get_hooks_dir() + loaded_hooks: list[Hook] = [] + + # Determine which subdirs to check based on event + subdir_map = { + EVENT_FILE_WRITE: ["pre-write", "post-write"], + EVENT_FILE_PATCH: ["pre-write", "post-write"], + EVENT_FILE_READ: [], + EVENT_TERMINAL_COMMAND: ["pre-command", "post-command"], + } + subdirs = subdir_map.get(event, []) + + for subdir in subdirs: + subdir_path = hooks_dir / subdir + if not subdir_path.is_dir(): + continue + + for hook_file in subdir_path.glob("*.yaml"): + try: + hook = _load_hook_file(hook_file) + if hook and hook.matches(event): + loaded_hooks.append(hook) + except Exception as e: + logger.warning(f"Failed to load hook {hook_file}: {e}") + + with _cache_lock: + _hooks_cache[cache_key] = loaded_hooks + + return loaded_hooks + + +def _load_hook_file(path: Path) -> Optional[Hook]: + """Load a single hook YAML file.""" + try: + with open(path, "r") as f: + data = yaml.safe_load(f) + + if not data: + return None + + name = data.get("name", path.stem) + enabled = data.get("enabled", True) + trigger = data.get("trigger", {}) + + actions = [] + for action_data in data.get("actions", []): + action = HookAction( + type=action_data.get("type", "command"), + command=action_data.get("command"), + message=action_data.get("message"), + timeout=action_data.get("timeout", 30), + block_on_failure=action_data.get("block_on_failure", False), + continue_on_error=action_data.get("continue_on_error", False), + platform=action_data.get("platform"), + ) + actions.append(action) + + conditions = data.get("conditions", {}) + + # Determine event type from subdir + subdir = path.parent.name + event_map = { + "pre-write": EVENT_FILE_WRITE, + "post-write": EVENT_FILE_WRITE, + "pre-command": EVENT_TERMINAL_COMMAND, + "post-command": EVENT_TERMINAL_COMMAND, + } + hook_event = trigger.get("event") or event_map.get(subdir, EVENT_FILE_WRITE) + + return Hook( + name=name, + description=data.get("description", ""), + enabled=enabled, + trigger_event=hook_event, + trigger_pattern=trigger.get("pattern"), + trigger_path=trigger.get("path"), + actions=actions, + conditions=conditions, + source_file=path, + ) + except Exception as e: + logger.error(f"Error loading hook {path}: {e}") + return None + + +def _check_conditions(hook: Hook, file_path: Optional[str] = None) -> bool: + """Check if hook conditions are met.""" + conditions = hook.conditions + + # Check if_command_exists + if "if_command_exists" in conditions: + required_commands = conditions["if_command_exists"] + if isinstance(required_commands, str): + required_commands = [required_commands] + for cmd in required_commands: + if not _command_exists(cmd): + return False + + # Check if_file_exists + if "if_file_exists" in conditions: + if file_path: + exists = Path(file_path).exists() + if conditions["if_file_exists"] and not exists: + return False + if not conditions["if_file_exists"] and exists: + return False + + return True + + +def _command_exists(cmd: str) -> bool: + """Check if a command exists in PATH.""" + try: + result = subprocess.run( + ["which", cmd], + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 + except Exception: + return False + + +def _expand_variables(text: str, context: dict[str, Any]) -> str: + """Expand {variable} placeholders in text.""" + result = text + for key, value in context.items(): + placeholder = "{" + key + "}" + if placeholder in result: + result = result.replace(placeholder, str(value)) + return result + + +def _run_hook_action( + action: HookAction, context: dict[str, Any], dry_run: bool = False +) -> tuple[bool, str]: + """ + Run a single hook action. + + Returns (success, message). + """ + if action.type == "command": + if not action.command: + return True, "No command specified" + + if dry_run: + return True, f"[DRY RUN] Would run: {action.command}" + + command = _expand_variables(action.command, context) + + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + timeout=action.timeout, + ) + + if result.returncode != 0: + error_msg = f"Command failed: {result.stderr or result.stdout}" + if action.block_on_failure: + return False, error_msg + elif not action.continue_on_error: + return False, error_msg + # continue_on_error = True → warn but continue + return True, f"WARN: {error_msg}" + return True, result.stdout or "Command succeeded" + + except subprocess.TimeoutExpired: + return False, f"Command timed out after {action.timeout}s" + except Exception as e: + return False, f"Command error: {e}" + + elif action.type == "notification": + message = _expand_variables(action.message or "", context) + platform = action.platform + + if platform == "telegram": + # Would integrate with send_message_tool + logger.info(f"[NOTIFICATION] {message}") + return True, f"Notification sent: {message}" + elif platform == "discord": + logger.info(f"[DISCORD] {message}") + return True, f"Discord notification sent: {message}" + else: + # Log to stdout + logger.info(f"[NOTIFICATION] {message}") + return True, message + + elif action.type == "log": + message = _expand_variables(action.message or "", context) + logger.info(f"[HOOK LOG] {message}") + return True, message + + return True, f"Unknown action type: {action.type}" + + +def run_hooks( + event: str, + file_path: Optional[str] = None, + command: Optional[str] = None, + context: Optional[dict[str, Any]] = None, + dry_run: bool = False, +) -> list[HookResult]: + """ + Run all hooks matching the given event. + + Args: + event: Event type (EVENT_FILE_WRITE, etc.) + file_path: File path for file events + command: Command for terminal events + context: Additional context for variable expansion + dry_run: If True, don't execute commands, just show what would run + + Returns: + List of HookResult, one per hook that ran + """ + hooks = load_hooks(event) + results: list[HookResult] = [] + blocked = False + + # Build context for variable expansion + ctx = context or {} + if file_path: + ctx["file"] = file_path + ctx["filename"] = Path(file_path).name + ctx["dir"] = str(Path(file_path).parent) + if command: + ctx["command"] = command + + for hook in hooks: + # Check conditions + if not _check_conditions(hook, file_path): + continue + + # Check if pre-write and file doesn't exist (for new files) + if "if_file_exists" in hook.conditions: + # Already checked in _check_conditions + pass + + hook_result = HookResult( + hook_name=hook.name, + success=True, + blocked=False, + message="", + details="", + ) + + for action in hook.actions: + success, message = _run_hook_action(action, ctx, dry_run) + + if not success: + hook_result.success = False + hook_result.details = message + if action.block_on_failure: + hook_result.blocked = True + hook_result.message = f"Hook '{hook.name}' blocked: {message}" + blocked = True + results.append(hook_result) + return results # Early return on block + else: + hook_result.message = f"Hook '{hook.name}' warned: {message}" + else: + if message: + hook_result.details += message + "\n" + + results.append(hook_result) + + if blocked: + # Add a summary result if hooks blocked + results.insert( + 0, + HookResult( + hook_name="__HOOK_BLOCK__", + success=False, + blocked=True, + message="Hook execution was blocked", + ), + ) + + return results + + +def invalidate_cache(): + """Clear the hooks cache to force reload.""" + global _cache_version + with _cache_lock: + _hooks_cache.clear() + _cache_version += 1 + + +def list_hooks() -> dict[str, list[dict[str, Any]]]: + """List all configured hooks by type.""" + hooks_dir = _get_hooks_dir() + result: dict[str, list[dict[str, Any]]] = {} + + for subdir in HOOK_SUBDIRS: + subdir_path = hooks_dir / subdir + if not subdir_path.is_dir(): + continue + + hooks_list: list[dict[str, Any]] = [] + for hook_file in sorted(subdir_path.glob("*.yaml")): + try: + hook = _load_hook_file(hook_file) + if hook: + hooks_list.append( + { + "name": hook.name, + "description": hook.description, + "enabled": hook.enabled, + "trigger_event": hook.trigger_event, + "trigger_pattern": hook.trigger_pattern, + "actions_count": len(hook.actions), + "source": str(hook_file.relative_to(hooks_dir)), + } + ) + except Exception as e: + logger.warning(f"Failed to load hook {hook_file}: {e}") + + if hooks_list: + result[subdir] = hooks_list + + return result + + +def create_hook( + name: str, + subdir: str, + trigger_event: str, + pattern: Optional[str] = None, + actions: Optional[list[dict[str, Any]]] = None, + description: str = "", +) -> tuple[bool, str]: + """ + Create a new hook file. + + Args: + name: Hook name (will be filename) + subdir: One of pre-write, post-write, pre-command, post-command + trigger_event: Event type + pattern: Optional glob pattern + actions: List of action dicts + description: Hook description + + Returns: + (success, message) + """ + hooks_dir = _get_hooks_dir() + subdir_path = hooks_dir / subdir + + try: + subdir_path.mkdir(parents=True, exist_ok=True) + + hook_data = { + "name": name, + "description": description, + "enabled": True, + "trigger": {"event": trigger_event}, + "actions": actions or [], + } + + if pattern: + hook_data["trigger"]["pattern"] = pattern + + hook_file = subdir_path / f"{name}.yaml" + with open(hook_file, "w") as f: + yaml.dump(hook_data, f, default_flow_style=False, sort_keys=False) + + invalidate_cache() + return True, f"Created hook: {hook_file}" + + except Exception as e: + return False, f"Failed to create hook: {e}" + + +def delete_hook(hook_path: str) -> tuple[bool, str]: + """ + Delete a hook by its relative path (e.g., 'post-write/pytest-on-py'). + + Returns: + (success, message) + """ + hooks_dir = _get_hooks_dir() + full_path = hooks_dir / hook_path + + if not full_path.exists() or not str(full_path).startswith(str(hooks_dir)): + return False, "Hook not found or invalid path" + + try: + full_path.unlink() + invalidate_cache() + return True, f"Deleted hook: {hook_path}" + except Exception as e: + return False, f"Failed to delete hook: {e}" + + +def enable_hook(hook_path: str) -> tuple[bool, str]: + """Enable a hook.""" + return _set_hook_enabled(hook_path, True) + + +def disable_hook(hook_path: str) -> tuple[bool, str]: + """Disable a hook.""" + return _set_hook_enabled(hook_path, False) + + +def _set_hook_enabled(hook_path: str, enabled: bool) -> tuple[bool, str]: + """Set a hook's enabled state.""" + hooks_dir = _get_hooks_dir() + full_path = hooks_dir / hook_path + + if not full_path.exists(): + return False, "Hook not found" + + try: + with open(full_path, "r") as f: + data = yaml.safe_load(f) + + data["enabled"] = enabled + + with open(full_path, "w") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False) + + invalidate_cache() + return True, f"{'Enabled' if enabled else 'Disabled'} hook: {hook_path}" + + except Exception as e: + return False, f"Failed to update hook: {e}" diff --git a/tools/reviewer_tool.py b/tools/reviewer_tool.py new file mode 100644 index 00000000..1416f56d --- /dev/null +++ b/tools/reviewer_tool.py @@ -0,0 +1,684 @@ +""" +Code Reviewer Tool + +Runs code quality checks for various languages. +Automatically detects language and runs appropriate linters, type checkers, and tests. + +Usage: + from tools.reviewer_tool import run_code_review + + result = run_code_review( + language="python", + paths=["src/"], + check_types=["lint", "typecheck", "test"] + ) +""" + +import json +import logging +import os +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Language Configurations +# ============================================================================= + +LANGUAGE_CONFIG: Dict[str, Dict[str, Any]] = { + "python": { + "extensions": [".py"], + "lint": { + "commands": [ + ("ruff", ["ruff", "check", "."]), + ("flake8", ["flake8", "."]), + ], + "default": "ruff", + }, + "typecheck": { + "commands": [ + ("mypy", ["mypy", "."]), + ], + "default": "mypy", + }, + "format": { + "commands": [ + ("ruff format", ["ruff", "format", "--check", "."]), + ("black", ["black", "--check", "."]), + ("isort", ["isort", "--check-only", "."]), + ], + "default": "ruff format", + }, + "test": { + "commands": [ + ("pytest", ["pytest", "-v", "--tb=short"]), + ("pytest coverage", ["pytest", "--cov=.", "--cov-report=term-missing"]), + ], + "default": "pytest", + }, + "security": { + "commands": [ + ("bandit", ["bandit", "-r", "."]), + ("safety", ["safety", "check"]), + ], + "default": "bandit", + }, + }, + "javascript": { + "extensions": [".js", ".jsx", ".mjs"], + "lint": { + "commands": [ + ("eslint", ["npx", "eslint", "src/"]), + ("eslint fix", ["npx", "eslint", "src/", "--fix"]), + ], + "default": "eslint", + }, + "typecheck": { + "commands": [ + ("tsc", ["npx", "tsc", "--noEmit"]), + ], + "default": "tsc", + }, + "format": { + "commands": [ + ("prettier", ["npx", "prettier", "--check", "src/"]), + ], + "default": "prettier", + }, + "test": { + "commands": [ + ("jest", ["npm", "test"]), + ("vitest", ["npx", "vitest", "run"]), + ], + "default": "jest", + }, + }, + "typescript": { + "extensions": [".ts", ".tsx"], + "lint": { + "commands": [ + ("eslint", ["npx", "eslint", "src/", "--ext", ".ts,.tsx"]), + ], + "default": "eslint", + }, + "typecheck": { + "commands": [ + ("tsc", ["npx", "tsc", "--noEmit", "--project", "tsconfig.json"]), + ], + "default": "tsc", + }, + "format": { + "commands": [ + ("prettier", ["npx", "prettier", "--check", "src/"]), + ], + "default": "prettier", + }, + "test": { + "commands": [ + ("jest", ["npx", "jest"]), + ("vitest", ["npx", "vitest", "run"]), + ], + "default": "jest", + }, + }, + "go": { + "extensions": [".go"], + "lint": { + "commands": [ + ("golangci-lint", ["golangci-lint", "run", "./..."]), + ("gofmt", ["gofmt", "-d", "."]), + ], + "default": "golangci-lint", + }, + "typecheck": { + "commands": [ + ("go vet", ["go", "vet", "./..."]), + ], + "default": "go vet", + }, + "format": { + "commands": [ + ("gofmt", ["gofmt", "-d", "."]), + ("go fmt", ["go", "fmt", "./..."]), + ], + "default": "gofmt", + }, + "test": { + "commands": [ + ("go test", ["go", "test", "-v", "./..."]), + ("go test coverage", ["go", "test", "-coverprofile=coverage.out", "./..."]), + ], + "default": "go test", + }, + }, + "rust": { + "extensions": [".rs"], + "lint": { + "commands": [ + ("clippy", ["cargo", "clippy", "--", "-D", "warnings"]), + ], + "default": "clippy", + }, + "typecheck": { + "commands": [ + ("cargo check", ["cargo", "check"]), + ], + "default": "cargo check", + }, + "format": { + "commands": [ + ("cargo fmt", ["cargo", "fmt", "--", "--check"]), + ], + "default": "cargo fmt", + }, + "test": { + "commands": [ + ("cargo test", ["cargo", "test", "--", "--nocapture"]), + ], + "default": "cargo test", + }, + }, + "java": { + "extensions": [".java"], + "lint": { + "commands": [ + ("checkstyle", ["mvn", "checkstyle:check"]), + ], + "default": "checkstyle", + }, + "typecheck": { + "commands": [ + ("mvn compile", ["mvn", "compile"]), + ], + "default": "mvn compile", + }, + "format": { + "commands": [ + ("mvn formatter", ["mvn", "formatter:format"]), + ], + "default": "mvn formatter", + }, + "test": { + "commands": [ + ("mvn test", ["mvn", "test"]), + ], + "default": "mvn test", + }, + }, + "cpp": { + "extensions": [".cpp", ".hpp", ".h", ".cc"], + "lint": { + "commands": [ + ("clang-tidy", ["clang-tidy", "src/**"]), + ("cppcheck", ["cppcheck", "--enable=all", "src/"]), + ], + "default": "clang-tidy", + }, + "typecheck": { + "commands": [ + ("cmake build", ["cmake", "--build", "build", "--target", "all"]), + ], + "default": "cmake build", + }, + "format": { + "commands": [ + ("clang-format", ["clang-format", "-i", "src/**"]), + ], + "default": "clang-format", + }, + "test": { + "commands": [ + ("ctest", ["ctest", "--output-on-failure"]), + ], + "default": "ctest", + }, + }, + "csharp": { + "extensions": [".cs"], + "lint": { + "commands": [ + ("dotnet format", ["dotnet", "format", "--verify-no-changes"]), + ], + "default": "dotnet format", + }, + "typecheck": { + "commands": [ + ("dotnet build", ["dotnet", "build", "--no-restore"]), + ], + "default": "dotnet build", + }, + "format": { + "commands": [ + ("dotnet format", ["dotnet", "format", "--verify-no-changes"]), + ], + "default": "dotnet format", + }, + "test": { + "commands": [ + ("dotnet test", ["dotnet", "test", "--no-build"]), + ], + "default": "dotnet test", + }, + }, + "php": { + "extensions": [".php"], + "lint": { + "commands": [ + ("phpcs", ["./vendor/bin/phpcs", "--standard=PSR12", "src/"]), + ("phpstan", ["./vendor/bin/phpstan", "analyse", "src/", "--level=max"]), + ], + "default": "phpcs", + }, + "typecheck": { + "commands": [ + ("php -l", ["php", "-l", "src/"]), + ], + "default": "php -l", + }, + "format": { + "commands": [ + ("phpcbf", ["./vendor/bin/phpcbf", "--standard=PSR12", "src/"]), + ], + "default": "phpcbf", + }, + "test": { + "commands": [ + ("phpunit", ["./vendor/bin/phpunit"]), + ("pest", ["./vendor/bin/pest"]), + ], + "default": "phpunit", + }, + }, +} + + +# ============================================================================= +# Data Structures +# ============================================================================= + +@dataclass +class CheckResult: + """Result of a single check.""" + name: str + success: bool + command: str + output: str + duration_ms: float + error_count: int = 0 + warning_count: int = 0 + + +@dataclass +class ReviewResult: + """Result of a full code review.""" + language: str + paths: List[str] + success: bool + checks: List[CheckResult] + total_duration_ms: float + summary: str + blocked: bool = False # True if CRITICAL issues found + + def to_dict(self) -> Dict: + return { + "language": self.language, + "paths": self.paths, + "success": self.success, + "blocked": self.blocked, + "total_duration_ms": self.total_duration_ms, + "summary": self.summary, + "checks": [ + { + "name": c.name, + "success": c.success, + "command": c.command, + "error_count": c.error_count, + "warning_count": c.warning_count, + "output": c.output[:500] if len(c.output) > 500 else c.output, + } + for c in self.checks + ], + } + + +# ============================================================================= +# Core Functions +# ============================================================================= + +def detect_language(paths: List[str]) -> Optional[str]: + """Detect language from file extensions.""" + extension_map: Dict[str, str] = {} + for lang, config in LANGUAGE_CONFIG.items(): + for ext in config.get("extensions", []): + extension_map[ext] = lang + + detected = set() + for path in paths: + p = Path(path) + if p.is_file(): + ext = p.suffix + if ext in extension_map: + detected.add(extension_map[ext]) + elif p.is_dir(): + for ext, lang in extension_map.items(): + if list(p.rglob(f"*{ext}")): + detected.add(lang) + + if len(detected) == 1: + return list(detected)[0] + elif len(detected) > 1: + # Prefer more specific languages + priority = ["typescript", "javascript", "python", "rust", "go", "java", "cpp", "csharp", "php"] + for lang in priority: + if lang in detected: + return lang + return None + + +def run_command(cmd: List[str], timeout: int = 120) -> Tuple[int, str, str]: + """Run a command and return (returncode, stdout, stderr).""" + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=os.getcwd(), + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "Command timed out" + except FileNotFoundError: + return -2, "", f"Command not found: {cmd[0]}" + except Exception as e: + return -3, "", str(e) + + +def parse_check_output(name: str, output: str, returncode: int) -> Tuple[bool, int, int]: + """Parse check output to determine success and counts.""" + if returncode == -2: + # Command not found - not an error + return True, 0, 0 + + if returncode == 0: + return True, 0, 0 + + # Try to parse error/warning counts from output + error_count = 0 + warning_count = 0 + + # Common patterns + output_lower = output.lower() + + if "error" in output_lower: + import re + errors = re.findall(r'\b(\d+)\s+error', output_lower) + if errors: + error_count = sum(int(e) for e in errors) + + if "warning" in output_lower: + warnings = re.findall(r'\b(\d+)\s+warning', output_lower) + if warnings: + warning_count = sum(int(w) for w in warnings) + + success = returncode == 0 + return success, error_count, warning_count + + +def run_check( + language: str, + check_type: str, + paths: List[str], + tool_name: Optional[str] = None, +) -> CheckResult: + """Run a single check for a language.""" + import time + + if language not in LANGUAGE_CONFIG: + return CheckResult( + name=f"{check_type}", + success=False, + command="", + output=f"Unknown language: {language}", + duration_ms=0, + ) + + config = LANGUAGE_CONFIG[language] + check_config = config.get(check_type) + + if not check_config: + return CheckResult( + name=f"{check_type}", + success=True, + command="", + output=f"No {check_type} configured for {language}", + duration_ms=0, + ) + + commands = check_config.get("commands", []) + if not commands: + return CheckResult( + name=f"{check_type}", + success=True, + command="", + output=f"No commands for {check_type}", + duration_ms=0, + ) + + # Find the requested tool or use default + if tool_name: + cmd_list = [c for c in commands if c[0] == tool_name] + if not cmd_list: + return CheckResult( + name=f"{check_type}", + success=False, + command="", + output=f"Tool {tool_name} not found for {check_type}", + duration_ms=0, + ) + cmd = cmd_list[0] + else: + # Use default tool + default_name = check_config.get("default", commands[0][0]) + cmd = next((c for c in commands if c[0] == default_name), commands[0]) + + # Expand paths + expanded_cmd = cmd[1] + if paths: + # Add paths to command if it doesn't already have them + if "." in expanded_cmd or any("$" not in c for c in expanded_cmd): + expanded_cmd = expanded_cmd + paths + + start_time = time.time() + returncode, stdout, stderr = run_command(expanded_cmd) + duration_ms = (time.time() - start_time) * 1000 + + output = stdout if stdout else stderr + success, error_count, warning_count = parse_check_output(cmd[0], output, returncode) + + return CheckResult( + name=f"{check_type}:{cmd[0]}", + success=success, + command=" ".join(expanded_cmd), + output=output, + duration_ms=duration_ms, + error_count=error_count, + warning_count=warning_count, + ) + + +def run_code_review( + language: Optional[str] = None, + paths: Optional[List[str]] = None, + check_types: Optional[List[str]] = None, + tools: Optional[Dict[str, str]] = None, +) -> ReviewResult: + """ + Run a full code review. + + Args: + language: Language to check (auto-detected if not provided) + paths: Files/directories to check (defaults to current directory) + check_types: Types of checks to run (defaults to lint, typecheck) + tools: Override specific tools, e.g., {"lint": "flake8", "test": "pytest"} + + Returns: + ReviewResult with all check results + """ + import time + + if paths is None: + paths = ["."] + + if language is None: + language = detect_language(paths) or "unknown" + + if check_types is None: + check_types = ["lint", "typecheck"] + + if tools is None: + tools = {} + + start_time = time.time() + checks = [] + total_errors = 0 + total_warnings = 0 + + for check_type in check_types: + tool_name = tools.get(check_type) + result = run_check(language, check_type, paths, tool_name) + checks.append(result) + total_errors += result.error_count + total_warnings += result.warning_count + + total_duration_ms = (time.time() - start_time) * 1000 + + # Determine overall success and blocked status + all_passed = all(c.success for c in checks) + has_critical = total_errors > 0 + + summary = f"{len(checks)} checks, {total_errors} errors, {total_warnings} warnings" + + return ReviewResult( + language=language, + paths=paths, + success=all_passed, + checks=checks, + total_duration_ms=total_duration_ms, + summary=summary, + blocked=has_critical and check_types == ["lint"], + ) + + +# ============================================================================= +# Tool Entry Point +# ============================================================================= + +def code_review_tool( + language: str = None, + paths: str = None, + check_types: str = None, + tools: str = None, +) -> str: + """ + Run code quality checks for a language. + + Args: + language: Language to check (python, javascript, typescript, go, rust, java, cpp, csharp, php) + paths: Comma-separated list of files/directories to check (default: current directory) + check_types: Comma-separated check types (lint, typecheck, format, test, security) + tools: JSON string mapping check types to specific tools, e.g., '{"lint": "ruff"}' + + Returns: + JSON string with ReviewResult + """ + # Parse inputs + lang = language.lower() if language else None + path_list = [p.strip() for p in paths.split(",")] if paths else ["."] + check_list = [c.strip() for c in check_types.split(",")] if check_types else ["lint", "typecheck"] + tool_map = json.loads(tools) if tools else {} + + # Run review + result = run_code_review( + language=lang, + paths=path_list, + check_types=check_list, + tools=tool_map, + ) + + return json.dumps(result.to_dict(), indent=2) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python reviewer_tool.py [paths] [check_types]") + sys.exit(1) + + language = sys.argv[1] + paths = sys.argv[2:3] if len(sys.argv) > 2 else ["."] + check_types = sys.argv[3:4] if len(sys.argv) > 3 else ["lint"] + + result = run_code_review( + language=language, + paths=paths, + check_types=check_types, + ) + + print(json.dumps(result.to_dict(), indent=2)) + + +# ============================================================================= +# Registry Registration +# ============================================================================= + +from tools.registry import registry + +registry.register( + name="code_review", + toolset="reviewer", + schema={ + "name": "code_review", + "description": "Run code quality checks (lint, typecheck, test) for a specific language. " + "Supports Python, JavaScript, TypeScript, Go, Rust, Java, C++, C#, PHP. " + "Returns structured JSON with check results, error counts, and blocked status.", + "parameters": { + "type": "object", + "properties": { + "language": { + "type": "string", + "description": "Language to check (python, javascript, typescript, go, rust, java, cpp, csharp, php). " + "Auto-detected if not provided.", + }, + "paths": { + "type": "string", + "description": "Comma-separated list of files/directories to check. Defaults to current directory.", + }, + "check_types": { + "type": "string", + "description": "Comma-separated check types: lint, typecheck, format, test, security. " + "Defaults to 'lint,typecheck'.", + }, + "tools": { + "type": "string", + "description": "JSON string mapping check types to specific tools, e.g., '{\"lint\": \"ruff\", \"test\": \"pytest\"}'", + }, + }, + }, + }, + handler=lambda args, **kw: code_review_tool( + language=args.get("language"), + paths=args.get("paths"), + check_types=args.get("check_types"), + tools=args.get("tools"), + ), + check_fn=lambda: True, # Always available + requires_env=[], + description="Run code quality checks for a language", + emoji="🔍", +) diff --git a/toolsets.py b/toolsets.py index 2e7a0a92..f4124a7d 100644 --- a/toolsets.py +++ b/toolsets.py @@ -34,7 +34,7 @@ _HERMES_CORE_TOOLS = [ # Terminal + process management "terminal", "process", # File manipulation - "read_file", "write_file", "patch", "search_files", + "read_file", "write_file", "patch", "search_files", "hooks", # Vision + image generation "vision_analyze", "image_generate", # Skills @@ -56,6 +56,8 @@ _HERMES_CORE_TOOLS = [ "execute_code", "delegate_task", # Cronjob management "cronjob", + # Code review + "code_review", # Cross-platform messaging (gated on gateway running via check_fn) "send_message", # Home Assistant smart home control (gated on HASS_TOKEN via check_fn)