Compare commits

...

3 Commits

Author SHA1 Message Date
teknium1
b159002078 feat: multi-agent architecture — named agents with routing, tool policies, and isolated workspaces
Implements the full multi-agent system for Hermes Agent, allowing a single
installation to host multiple named agents, each with its own model,
personality, toolset, workspace, and session history.

## New Files

- gateway/agent_registry.py: AgentConfig, ToolPolicy, SubagentPolicy,
  AgentRegistry, TOOL_PROFILES (minimal/coding/messaging/full), and
  normalize_tool_config() for shorthand YAML parsing

- gateway/router.py: BindingRouter with 7-tier deterministic routing
  (chat_id > peer > guild+type > guild > platform+type > platform > default)

## Core Changes

- model_tools.py: get_tool_definitions() accepts agent_tool_policy for
  per-agent tool filtering; handle_function_call() extended enabled_tools
  check to gate ALL tool calls (defense-in-depth)

- gateway/session.py: build_session_key() now accepts agent_id and dm_scope
  parameters, replacing hardcoded 'agent:main' with 'agent:{agent_id}'

- tools/memory_tool.py: MemoryStore accepts memory_dir parameter for
  per-agent memory isolation

- agent/prompt_builder.py: build_context_files_prompt() accepts
  agent_workspace for SOUL.md lookup; build_skills_system_prompt()
  accepts agent_skills_dir for per-agent skill overlay

- run_agent.py: AIAgent accepts agent_tool_policy and agent_workspace,
  passes policy through to get_tool_definitions()

- gateway/run.py: Initializes AgentRegistry + BindingRouter, resolves
  agent per-message in _handle_message(), passes config to _run_agent(),
  adds /agents command

- cli.py: --agent flag for selecting named agent profiles, /agents
  slash command, agent config override for model/personality/tools

- hermes_cli/config.py: agents/bindings in DEFAULT_CONFIG, version 7

- tools/delegate_tool.py: Configurable max_depth per-agent, tool policy
  inheritance from parent to child

## Config Format

agents:
  main:
    default: true
  coder:
    model: anthropic/claude-sonnet-4
    personality: 'You are a coding assistant.'
    tools: coding  # or [tool1, tool2] or {profile: x, deny: [...]}

bindings:
  - agent: coder
    telegram: '-100123456'

## Tests

168 new tests across 3 test files (agent_registry, router, integration).
All 3106 tests pass.
2026-03-11 03:21:12 -07:00
teknium1
1115e35aae test: add tests for subagent model config override
4 new tests verifying:
- Subagent inherits parent model by default
- Config model overrides parent model
- Explicit model arg overrides config
- Graceful fallback when CLI_CONFIG unavailable
2026-03-10 23:48:55 -07:00
Bartok Moltbot
6bd1726422 feat(subagent): add configurable subagent model via config.yaml
Allow users to configure a dedicated model for subagents spawned by
delegate_task, so narrowly-scoped subtasks can use a cheaper/faster
model while the parent agent runs on a more powerful one.

Config:
  subagent:
    model: google/gemini-3-flash-preview

Precedence: explicit model arg > config.subagent.model > parent model.

Cherry-picked from PR #751 by Bartok9, rebased onto current main
with conflict resolution and simplified to model-only override
(provider/base_url/api_key stay inherited from parent — covers the
common case of same-provider model swap via OpenRouter).

Closes #609

Co-authored-by: Bartok Moltbot <bartokmoltbot@users.noreply.github.com>
2026-03-10 23:45:05 -07:00
18 changed files with 2585 additions and 54 deletions

View File

@@ -179,10 +179,12 @@ def _skill_is_platform_compatible(skill_file: Path) -> bool:
return True # Err on the side of showing the skill return True # Err on the side of showing the skill
def build_skills_system_prompt() -> str: def build_skills_system_prompt(agent_skills_dir: Optional[Path] = None) -> str:
"""Build a compact skill index for the system prompt. """Build a compact skill index for the system prompt.
Scans ~/.hermes/skills/ for SKILL.md files grouped by category. Scans ~/.hermes/skills/ for SKILL.md files grouped by category.
When agent_skills_dir is provided and exists, also scans it for SKILL.md
files. Agent skills take priority (listed first in each category).
Includes per-skill descriptions from frontmatter so the model can Includes per-skill descriptions from frontmatter so the model can
match skills by meaning, not just name. match skills by meaning, not just name.
Filters out skills incompatible with the current OS platform. Filters out skills incompatible with the current OS platform.
@@ -190,26 +192,22 @@ def build_skills_system_prompt() -> str:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
skills_dir = hermes_home / "skills" skills_dir = hermes_home / "skills"
if not skills_dir.exists():
return ""
# Collect skills with descriptions, grouped by category # Collect skills with descriptions, grouped by category
# Each entry: (skill_name, description) # Each entry: (skill_name, description)
# Supports sub-categories: skills/mlops/training/axolotl/SKILL.md # Supports sub-categories: skills/mlops/training/axolotl/SKILL.md
# → category "mlops/training", skill "axolotl" # → category "mlops/training", skill "axolotl"
skills_by_category: dict[str, list[tuple[str, str]]] = {} skills_by_category: dict[str, list[tuple[str, str]]] = {}
for skill_file in skills_dir.rglob("SKILL.md"):
# Skip skills incompatible with the current OS platform def _scan_skills_dir(scan_dir: Path):
"""Scan a directory for SKILL.md files and add them to skills_by_category."""
if not scan_dir.exists():
return
for skill_file in scan_dir.rglob("SKILL.md"):
if not _skill_is_platform_compatible(skill_file): if not _skill_is_platform_compatible(skill_file):
continue continue
rel_path = skill_file.relative_to(skills_dir) rel_path = skill_file.relative_to(scan_dir)
parts = rel_path.parts parts = rel_path.parts
if len(parts) >= 2: if len(parts) >= 2:
# Category is everything between skills_dir and the skill folder
# e.g. parts = ("mlops", "training", "axolotl", "SKILL.md")
# → category = "mlops/training", skill_name = "axolotl"
# e.g. parts = ("github", "github-auth", "SKILL.md")
# → category = "github", skill_name = "github-auth"
skill_name = parts[-2] skill_name = parts[-2]
category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0] category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0]
else: else:
@@ -218,6 +216,13 @@ def build_skills_system_prompt() -> str:
desc = _read_skill_description(skill_file) desc = _read_skill_description(skill_file)
skills_by_category.setdefault(category, []).append((skill_name, desc)) skills_by_category.setdefault(category, []).append((skill_name, desc))
# Agent skills first (so they appear first / take priority)
if agent_skills_dir is not None:
_scan_skills_dir(agent_skills_dir)
# Then global skills
_scan_skills_dir(skills_dir)
if not skills_by_category: if not skills_by_category:
return "" return ""
@@ -284,11 +289,11 @@ def _truncate_content(content: str, filename: str, max_chars: int = CONTEXT_FILE
return head + marker + tail return head + marker + tail
def build_context_files_prompt(cwd: Optional[str] = None) -> str: def build_context_files_prompt(cwd: Optional[str] = None, agent_workspace: Optional[Path] = None) -> str:
"""Discover and load context files for the system prompt. """Discover and load context files for the system prompt.
Discovery: AGENTS.md (recursive), .cursorrules / .cursor/rules/*.mdc, Discovery: AGENTS.md (recursive), .cursorrules / .cursor/rules/*.mdc,
SOUL.md (cwd then ~/.hermes/ fallback). Each capped at 20,000 chars. SOUL.md (agent_workspace then cwd then ~/.hermes/ fallback). Each capped at 20,000 chars.
""" """
if cwd is None: if cwd is None:
cwd = os.getcwd() cwd = os.getcwd()
@@ -356,8 +361,15 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
cursorrules_content = _truncate_content(cursorrules_content, ".cursorrules") cursorrules_content = _truncate_content(cursorrules_content, ".cursorrules")
sections.append(cursorrules_content) sections.append(cursorrules_content)
# SOUL.md (cwd first, then ~/.hermes/ fallback) # SOUL.md (agent_workspace first, then cwd, then ~/.hermes/ fallback)
soul_path = None soul_path = None
if agent_workspace is not None:
for name in ["SOUL.md", "soul.md"]:
candidate = agent_workspace / name
if candidate.exists():
soul_path = candidate
break
if not soul_path:
for name in ["SOUL.md", "soul.md"]: for name in ["SOUL.md", "soul.md"]:
candidate = cwd_path / name candidate = cwd_path / name
if candidate.exists(): if candidate.exists():

43
cli.py
View File

@@ -1090,6 +1090,7 @@ class HermesCLI:
compact: bool = False, compact: bool = False,
resume: str = None, resume: str = None,
checkpoints: bool = False, checkpoints: bool = False,
agent: str = None,
): ):
""" """
Initialize the Hermes CLI. Initialize the Hermes CLI.
@@ -1208,6 +1209,31 @@ class HermesCLI:
fb = CLI_CONFIG.get("fallback_model") or {} fb = CLI_CONFIG.get("fallback_model") or {}
self._fallback_model = fb if fb.get("provider") and fb.get("model") else None self._fallback_model = fb if fb.get("provider") and fb.get("model") else None
# Multi-agent: resolve agent config from config.yaml
self._agent_id = agent or 'main'
self._agent_config = None
self._agent_tool_policy = None
self._agent_workspace = None
agents_config = CLI_CONFIG.get('agents', {})
if agents_config and self._agent_id in agents_config:
from gateway.agent_registry import AgentRegistry
registry = AgentRegistry({'agents': agents_config})
self._agent_config = registry.get(self._agent_id)
if self._agent_config:
# Override model/provider/personality from agent config
if self._agent_config.model:
self.model = self._agent_config.model
if self._agent_config.provider:
self.provider = self._agent_config.provider
if self._agent_config.personality:
self._ephemeral_system_prompt = self._agent_config.personality
if self._agent_config.max_turns:
self.max_turns = self._agent_config.max_turns
if self._agent_config.toolsets:
self._toolsets = self._agent_config.toolsets
self._agent_tool_policy = self._agent_config.tool_policy
self._agent_workspace = str(self._agent_config.workspace_dir)
# Agent will be initialized on first use # Agent will be initialized on first use
self.agent: Optional[AIAgent] = None self.agent: Optional[AIAgent] = None
self._app = None # prompt_toolkit Application (set in run()) self._app = None # prompt_toolkit Application (set in run())
@@ -1489,6 +1515,8 @@ class HermesCLI:
thinking_callback=self._on_thinking, thinking_callback=self._on_thinking,
checkpoints_enabled=self.checkpoints_enabled, checkpoints_enabled=self.checkpoints_enabled,
checkpoint_max_snapshots=self.checkpoint_max_snapshots, checkpoint_max_snapshots=self.checkpoint_max_snapshots,
agent_tool_policy=self._agent_tool_policy,
agent_workspace=self._agent_workspace,
) )
# Apply any pending title now that the session exists in the DB # Apply any pending title now that the session exists in the DB
if self._pending_title and self._session_db: if self._pending_title and self._session_db:
@@ -2545,6 +2573,19 @@ class HermesCLI:
self.show_tools() self.show_tools()
elif cmd_lower == "/toolsets": elif cmd_lower == "/toolsets":
self.show_toolsets() self.show_toolsets()
elif cmd_lower == "/agents":
agents_config = CLI_CONFIG.get('agents', {})
if not agents_config:
print('No agents configured. Add agents to config.yaml.')
else:
print('\n📋 Configured Agents:\n')
for name, cfg in agents_config.items():
marker = ' (active)' if name == self._agent_id else ''
model = cfg.get('model', '(inherited)')
desc = cfg.get('description', '')
print(f' {name}{marker} {model} {desc}')
print()
return True
elif cmd_lower == "/config": elif cmd_lower == "/config":
self.show_config() self.show_config()
elif cmd_lower == "/clear": elif cmd_lower == "/clear":
@@ -4365,6 +4406,7 @@ def main(
worktree: bool = False, worktree: bool = False,
w: bool = False, w: bool = False,
checkpoints: bool = False, checkpoints: bool = False,
agent: str = None,
): ):
""" """
Hermes Agent CLI - Interactive AI Assistant Hermes Agent CLI - Interactive AI Assistant
@@ -4470,6 +4512,7 @@ def main(
compact=compact, compact=compact,
resume=resume, resume=resume,
checkpoints=checkpoints, checkpoints=checkpoints,
agent=agent,
) )
# Inject worktree context into agent's system prompt # Inject worktree context into agent's system prompt

504
gateway/agent_registry.py Normal file
View File

@@ -0,0 +1,504 @@
"""
Agent registry for multi-agent support.
Manages agent configurations, tool policies, and workspace resolution.
Each agent has its own identity, model settings, tool access, and workspace.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
HERMES_HOME = Path.home() / ".hermes"
# ---------------------------------------------------------------------------
# Tool profiles -- predefined sets of allowed tools
# ---------------------------------------------------------------------------
TOOL_PROFILES: Dict[str, Dict[str, Any]] = {
"minimal": {
"allow": [
"clarify",
"memory",
"todo",
"session_search",
],
},
"coding": {
"allow": [
"terminal",
"process",
"read_file",
"write_file",
"patch",
"search_files",
"web_search",
"web_extract",
"memory",
"todo",
"clarify",
"session_search",
"delegate_task",
"execute_code",
"vision_analyze",
],
},
"messaging": {
"allow": [
"web_search",
"web_extract",
"memory",
"todo",
"clarify",
"session_search",
"send_message",
"text_to_speech",
"image_generate",
],
},
"full": {}, # No restrictions
}
# Valid agent ID pattern: starts with lowercase letter/digit, rest can include _ and -
_AGENT_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
# ---------------------------------------------------------------------------
# ToolPolicy
# ---------------------------------------------------------------------------
@dataclass
class ToolPolicy:
"""
Declarative tool access policy for an agent.
Resolution pipeline (applied in order):
1. Start with the profile's allow-list (or all tools if no profile / 'full').
2. Add any names from ``also_allow``.
3. If an explicit ``allow`` list is set, intersect with it.
4. Remove any names from ``deny`` (deny always wins).
"""
profile: Optional[str] = None
allow: Optional[List[str]] = None
also_allow: Optional[List[str]] = None
deny: Optional[List[str]] = None
def apply(self, tools: Set[str]) -> Set[str]:
"""
Filter a set of tool names according to this policy.
The pipeline is: profile -> also_allow -> allow -> deny.
Deny always wins — denied tools are removed regardless of other rules.
Parameters
----------
tools:
The full set of available tool names.
Returns
-------
Set[str]
The subset of tools this agent is permitted to use.
"""
# Step 1: Start from profile
if self.profile and self.profile in TOOL_PROFILES:
profile_def = TOOL_PROFILES[self.profile]
if "allow" in profile_def:
result = tools & set(profile_def["allow"])
else:
# Profile like 'full' with no allow list => all tools
result = set(tools)
else:
# No profile => start with all tools
result = set(tools)
# Step 2: Additive extras from also_allow
if self.also_allow:
result |= tools & set(self.also_allow)
# Step 3: Explicit allow list narrows the result
if self.allow is not None:
result &= set(self.allow)
# Step 4: Deny always wins
if self.deny:
result -= set(self.deny)
return result
# ---------------------------------------------------------------------------
# SubagentPolicy
# ---------------------------------------------------------------------------
@dataclass
class SubagentPolicy:
"""Controls how an agent may spawn sub-agents."""
max_depth: int = 2
max_children: int = 5
model: Optional[str] = None
# ---------------------------------------------------------------------------
# AgentConfig
# ---------------------------------------------------------------------------
@dataclass
class AgentConfig:
"""
Full configuration for a single agent persona.
Attributes
----------
id:
Unique identifier (lowercase, alphanumeric + hyphens/underscores).
description:
Human-readable description of this agent's purpose.
default:
Whether this is the default agent (exactly one must be default).
model:
LLM model identifier. ``None`` inherits the global default.
provider:
LLM provider name (e.g. ``'anthropic'``, ``'openai'``).
personality:
Inline personality/system prompt text, or path to a file.
workspace:
Custom workspace directory override. ``None`` uses the default.
toolsets:
List of toolset names to load (overrides platform default).
tool_policy:
Declarative tool access restrictions.
reasoning:
Provider-specific reasoning/thinking configuration dict.
max_turns:
Maximum agentic loop iterations per request.
sandbox:
Sandbox/isolation configuration dict.
fallback_model:
Fallback model configuration dict (used on primary failure).
memory_enabled:
Whether long-term memory is active for this agent.
subagents:
Sub-agent spawning policy.
dm_scope:
Which agent handles DMs on messaging platforms (``'main'`` by default).
"""
id: str
description: str = ""
default: bool = False
model: Optional[str] = None
provider: Optional[str] = None
personality: Optional[str] = None
workspace: Optional[str] = None
toolsets: Optional[List[str]] = None
tool_policy: Optional[ToolPolicy] = None
reasoning: Optional[Dict[str, Any]] = None
max_turns: Optional[int] = None
sandbox: Optional[Dict[str, Any]] = None
fallback_model: Optional[Dict[str, Any]] = None
memory_enabled: bool = True
subagents: SubagentPolicy = field(default_factory=SubagentPolicy)
dm_scope: str = "main"
# -- derived paths -------------------------------------------------------
@property
def workspace_dir(self) -> Path:
"""Agent-specific workspace directory."""
if self.workspace:
return Path(self.workspace).expanduser()
return HERMES_HOME / "agents" / self.id
@property
def sessions_dir(self) -> Path:
"""Directory for this agent's session data."""
return self.workspace_dir / "sessions"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def normalize_tool_config(raw: Any) -> Optional[ToolPolicy]:
"""
Coerce various shorthand forms into a ``ToolPolicy``.
Accepted inputs::
None -> None
"coding" -> ToolPolicy(profile="coding")
["read_file", …] -> ToolPolicy(allow=[…])
{profile: …, …} -> ToolPolicy(**dict)
Parameters
----------
raw:
Raw tool policy value from configuration.
Returns
-------
Optional[ToolPolicy]
"""
if raw is None:
return None
if isinstance(raw, str):
return ToolPolicy(profile=raw)
if isinstance(raw, list):
return ToolPolicy(allow=raw)
if isinstance(raw, dict):
return ToolPolicy(
profile=raw.get("profile"),
allow=raw.get("allow"),
also_allow=raw.get("also_allow"),
deny=raw.get("deny"),
)
raise TypeError(f"Invalid tool_policy value: {raw!r}")
def _validate_agent_id(agent_id: str) -> None:
"""Raise ``ValueError`` if *agent_id* is not a valid identifier."""
if not _AGENT_ID_RE.match(agent_id):
raise ValueError(
f"Invalid agent id {agent_id!r}. Must match "
f"[a-z0-9][a-z0-9_-]{{0,63}}"
)
# ---------------------------------------------------------------------------
# AgentRegistry
# ---------------------------------------------------------------------------
class AgentRegistry:
"""
Registry of configured agent personas.
Parses the ``agents`` section of the top-level config dict and exposes
lookup / resolution helpers used by the runtime.
"""
def __init__(self, config: dict, global_config: dict = None) -> None:
self._agents: Dict[str, AgentConfig] = {}
self._default_id: str = "main"
self._parse_agents(config, global_config)
# -- parsing -------------------------------------------------------------
def _parse_agents(self, config: dict, global_config: dict = None) -> None:
"""
Parse ``config['agents']`` into ``AgentConfig`` instances.
If the config has no ``agents`` key an implicit *main* agent is
created from ``global_config`` so the system always has at least
one agent.
Parameters
----------
config:
Config dict that may contain an ``agents`` key with a flat dict
of agent definitions keyed by name.
global_config:
Top-level global config dict used to populate the implicit
*main* agent when no ``agents`` key is present.
"""
agents_raw: Optional[Dict[str, Any]] = config.get("agents")
if not agents_raw:
# Implicit single-agent setup — derive from global_config
gc = global_config or {}
main = AgentConfig(
id="main",
default=True,
model=gc.get("model"),
provider=gc.get("provider"),
personality=gc.get("personality"),
tool_policy=normalize_tool_config(gc.get("tools")),
reasoning=gc.get("reasoning"),
max_turns=gc.get("max_turns"),
memory_enabled=gc.get("memory_enabled", True),
)
self._agents = {"main": main}
self._default_id = "main"
return
agents: Dict[str, AgentConfig] = {}
seen_ids: Set[str] = set()
default_id: Optional[str] = None
first_id: Optional[str] = None
for name, agent_data in agents_raw.items():
if agent_data is None:
agent_data = {}
agent_id = agent_data.get("id", name)
_validate_agent_id(agent_id)
if agent_id in seen_ids:
raise ValueError(f"Duplicate agent id: {agent_id!r}")
seen_ids.add(agent_id)
if first_id is None:
first_id = agent_id
# Normalize the tools / tool_policy field
tool_policy = normalize_tool_config(
agent_data.get("tools", agent_data.get("tool_policy"))
)
subagent_raw = agent_data.get("subagents")
if isinstance(subagent_raw, dict):
subagent_policy = SubagentPolicy(**subagent_raw)
else:
subagent_policy = SubagentPolicy()
is_default = agent_data.get("default", False)
agent_cfg = AgentConfig(
id=agent_id,
description=agent_data.get("description", ""),
default=is_default,
model=agent_data.get("model"),
provider=agent_data.get("provider"),
personality=agent_data.get("personality"),
workspace=agent_data.get("workspace"),
toolsets=agent_data.get("toolsets"),
tool_policy=tool_policy,
reasoning=agent_data.get("reasoning"),
max_turns=agent_data.get("max_turns"),
sandbox=agent_data.get("sandbox"),
fallback_model=agent_data.get("fallback_model"),
memory_enabled=agent_data.get("memory_enabled", True),
subagents=subagent_policy,
dm_scope=agent_data.get("dm_scope", "main"),
)
if is_default:
if default_id is not None:
raise ValueError(
f"Multiple default agents: {default_id!r} and {agent_id!r}"
)
default_id = agent_id
agents[agent_id] = agent_cfg
# If nobody was explicitly marked default, the first agent wins
if default_id is None and first_id is not None:
default_id = first_id
agents[first_id].default = True
logger.debug(
"No explicit default agent; using first: %s", first_id
)
self._agents = agents
self._default_id = default_id or "main"
# -- public API ----------------------------------------------------------
def get(self, agent_id: str) -> AgentConfig:
"""
Return the config for *agent_id*, falling back to the default agent.
"""
return self._agents.get(agent_id, self.get_default())
def get_default(self) -> AgentConfig:
"""Return the default agent configuration."""
return self._agents[self._default_id]
def list_agents(self) -> List[AgentConfig]:
"""Return all registered agent configurations."""
return list(self._agents.values())
# -- resolution helpers --------------------------------------------------
def resolve_personality(self, agent: AgentConfig) -> Optional[str]:
"""
Resolve the personality/system-prompt text for *agent*.
Resolution order:
1. ``agent.personality`` field (inline text or file path).
2. ``SOUL.md`` in the agent's workspace directory.
3. Global ``~/.hermes/SOUL.md`` (only for the *main* agent).
4. ``None``.
"""
# 1. Explicit personality in config
if agent.personality:
personality_path = Path(agent.personality).expanduser()
if personality_path.is_file():
try:
return personality_path.read_text(encoding="utf-8").strip()
except OSError:
logger.warning(
"Could not read personality file: %s", personality_path
)
# Treat as inline text
return agent.personality
# 2. Workspace SOUL.md
workspace_soul = agent.workspace_dir / "SOUL.md"
if workspace_soul.is_file():
try:
return workspace_soul.read_text(encoding="utf-8").strip()
except OSError:
logger.warning(
"Could not read workspace SOUL.md: %s", workspace_soul
)
# 3. Global SOUL.md (main agent only)
if agent.id == "main":
global_soul = HERMES_HOME / "SOUL.md"
if global_soul.is_file():
try:
return global_soul.read_text(encoding="utf-8").strip()
except OSError:
logger.warning(
"Could not read global SOUL.md: %s", global_soul
)
# 4. Nothing
return None
def resolve_toolsets(
self, agent: AgentConfig, platform: str
) -> Optional[List[str]]:
"""
Determine which toolsets to load for *agent* on *platform*.
Returns the agent's explicit ``toolsets`` list if set, otherwise
``None`` to let the caller fall back to the platform's default
toolset configuration.
Parameters
----------
agent:
The agent whose toolsets to resolve.
platform:
The platform name (e.g. ``'telegram'``, ``'local'``).
Returns
-------
Optional[List[str]]
Ordered list of toolset names, or ``None`` for platform default.
"""
if agent.toolsets is not None:
return list(agent.toolsets)
return None
@staticmethod
def ensure_workspace(agent: AgentConfig) -> None:
"""
Create the agent's workspace and session directories if they
do not already exist.
"""
agent.workspace_dir.mkdir(parents=True, exist_ok=True)
agent.sessions_dir.mkdir(parents=True, exist_ok=True)
logger.debug(
"Ensured workspace for agent %s: %s", agent.id, agent.workspace_dir
)

195
gateway/router.py Normal file
View File

@@ -0,0 +1,195 @@
"""Binding router for multi-agent message routing.
Maps incoming messages to agent IDs based on platform, chat, guild, and
other session-source fields. Bindings are ranked by specificity so that
the most precise rule always wins.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
# ── constants ────────────────────────────────────────────────────────────
PLATFORM_NAMES: set[str] = {
"telegram",
"discord",
"slack",
"whatsapp",
"signal",
"homeassistant",
}
_KEY_EXPANSION: Dict[str, str] = {
"guild": "guild_id",
"type": "chat_type",
"team": "team_id",
"peer": "peer",
}
# ── data ─────────────────────────────────────────────────────────────────
@dataclass(frozen=True, slots=True)
class Binding:
"""A single routing rule that maps a match pattern to an agent."""
agent_id: str
match: Dict[str, str] = field(default_factory=dict)
tier: int = 7 # computed priority (1 = most specific)
# ── helpers ──────────────────────────────────────────────────────────────
def _assign_tier(match: Dict[str, str]) -> int:
"""Return a priority tier (17) based on how specific *match* is.
Lower tier number means higher priority (more specific).
Tier 1: platform + chat_id (exact channel)
Tier 2: platform + peer (exact DM user)
Tier 3: platform + guild_id + chat_type
Tier 4: platform + (guild_id | team_id)
Tier 5: platform + chat_type
Tier 6: platform only
Tier 7: fallback (empty match)
"""
keys = set(match.keys()) - {"platform"}
if not match:
return 7
if "chat_id" in keys:
return 1
if "peer" in keys:
return 2
if "guild_id" in keys and "chat_type" in keys:
return 3
if "guild_id" in keys or "team_id" in keys:
return 4
if "chat_type" in keys:
return 5
if "platform" in match:
return 6
return 7
def normalize_binding(raw: dict) -> Binding:
"""Normalise a shorthand binding dict into a :class:`Binding`.
Accepted shorthand formats::
{"agent": "coder", "telegram": "-100123"}
→ Binding(agent_id="coder",
match={"platform": "telegram", "chat_id": "-100123"})
{"agent": "assistant", "whatsapp": "*"}
→ Binding(agent_id="assistant",
match={"platform": "whatsapp"})
{"agent": "coder", "discord": {"guild": "123", "type": "channel"}}
→ Binding(agent_id="coder",
match={"platform": "discord",
"guild_id": "123", "chat_type": "channel"})
"""
agent_id: str = raw.get("agent", raw.get("agent_id", ""))
if not agent_id:
raise ValueError(f"Binding missing 'agent' key: {raw!r}")
match: Dict[str, str] = {}
for platform in PLATFORM_NAMES:
if platform not in raw:
continue
value: Any = raw[platform]
match["platform"] = platform
if isinstance(value, str):
if value != "*":
match["chat_id"] = value
elif isinstance(value, dict):
for short_key, expanded_key in _KEY_EXPANSION.items():
if short_key in value:
match[expanded_key] = str(value[short_key])
# Pass through any keys that are already in expanded form
for k, v in value.items():
if k not in _KEY_EXPANSION:
match[k] = str(v)
else:
raise TypeError(
f"Unsupported value type for platform '{platform}': "
f"{type(value).__name__}"
)
break # only one platform key per binding
tier = _assign_tier(match)
return Binding(agent_id=agent_id, match=match, tier=tier)
# ── router ───────────────────────────────────────────────────────────────
class BindingRouter:
"""Route incoming messages to agent IDs based on binding rules.
Parameters
----------
bindings_config:
A list of raw binding dicts (shorthand format).
default_agent_id:
Fallback agent ID when no binding matches.
"""
def __init__(self, bindings_config: list, default_agent_id: str) -> None:
self._default_agent_id: str = default_agent_id
self._bindings: List[Binding] = sorted(
(normalize_binding(raw) for raw in bindings_config),
key=lambda b: b.tier,
)
# ── public API ───────────────────────────────────────────────────
def resolve(
self,
platform: str,
chat_id: Optional[str] = None,
chat_type: Optional[str] = None,
user_id: Optional[str] = None,
guild_id: Optional[str] = None,
team_id: Optional[str] = None,
) -> str:
"""Return the agent ID for the most specific matching binding.
Iterates bindings in tier order (most specific first). The first
match wins. Falls back to *default_agent_id* if nothing matches.
"""
kwargs: Dict[str, Optional[str]] = {
"platform": platform,
"chat_id": chat_id,
"chat_type": chat_type,
"user_id": user_id,
"guild_id": guild_id,
"team_id": team_id,
}
for binding in self._bindings:
if self._matches(binding, **kwargs):
return binding.agent_id
return self._default_agent_id
# ── internals ────────────────────────────────────────────────────
@staticmethod
def _matches(binding: Binding, **kwargs: Optional[str]) -> bool:
"""Check whether *binding* matches the supplied keyword arguments.
Uses AND semantics: every key present in ``binding.match`` must
equal the corresponding value in *kwargs*. Keys absent from the
binding act as wildcards (always match).
"""
for key, required_value in binding.match.items():
actual = kwargs.get(key)
if actual is None:
return False
if str(actual) != str(required_value):
return False
return True

View File

@@ -161,6 +161,8 @@ from gateway.session import (
) )
from gateway.delivery import DeliveryRouter, DeliveryTarget from gateway.delivery import DeliveryRouter, DeliveryTarget
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
from gateway.agent_registry import AgentRegistry, AgentConfig
from gateway.router import BindingRouter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -207,6 +209,24 @@ class GatewayRunner:
self._provider_routing = self._load_provider_routing() self._provider_routing = self._load_provider_routing()
self._fallback_model = self._load_fallback_model() self._fallback_model = self._load_fallback_model()
# Load raw config dict for multi-agent support
self._raw_config: dict = {}
try:
import yaml as _y
_cfg_path = _hermes_home / 'config.yaml'
if _cfg_path.exists():
with open(_cfg_path, encoding='utf-8') as _f:
self._raw_config = _y.safe_load(_f) or {}
except Exception:
pass
# Multi-agent registry and router
self._agent_registry = AgentRegistry(self._raw_config, global_config=self._raw_config)
self._router = BindingRouter(
self._raw_config.get('bindings', []),
self._agent_registry.get_default().id,
)
# Wire process registry into session store for reset protection # Wire process registry into session store for reset protection
from tools.process_registry import process_registry from tools.process_registry import process_registry
self.session_store = SessionStore( self.session_store = SessionStore(
@@ -785,10 +805,19 @@ class GatewayRunner:
) )
return None return None
# Resolve which agent handles this message
agent_id = self._router.resolve(
platform=event.source.platform.value if hasattr(event.source, 'platform') else 'cli',
chat_id=getattr(event.source, 'chat_id', None),
chat_type=getattr(event.source, 'chat_type', None),
user_id=getattr(event.source, 'user_id', None),
)
agent_config = self._agent_registry.get(agent_id)
# PRIORITY: If an agent is already running for this session, interrupt it # PRIORITY: If an agent is already running for this session, interrupt it
# immediately. This is before command parsing to minimize latency -- the # immediately. This is before command parsing to minimize latency -- the
# user's "stop" message reaches the agent as fast as possible. # user's "stop" message reaches the agent as fast as possible.
_quick_key = build_session_key(source) _quick_key = build_session_key(source, agent_id=agent_id)
if _quick_key in self._running_agents: if _quick_key in self._running_agents:
running_agent = self._running_agents[_quick_key] running_agent = self._running_agents[_quick_key]
logger.debug("PRIORITY interrupt for session %s", _quick_key[:20]) logger.debug("PRIORITY interrupt for session %s", _quick_key[:20])
@@ -806,7 +835,8 @@ class GatewayRunner:
_known_commands = {"new", "reset", "help", "status", "stop", "model", _known_commands = {"new", "reset", "help", "status", "stop", "model",
"personality", "retry", "undo", "sethome", "set-home", "personality", "retry", "undo", "sethome", "set-home",
"compress", "usage", "insights", "reload-mcp", "reload_mcp", "compress", "usage", "insights", "reload-mcp", "reload_mcp",
"update", "title", "resume", "provider", "rollback"} "update", "title", "resume", "provider", "rollback",
"agents"}
if command and command in _known_commands: if command and command in _known_commands:
await self.hooks.emit(f"command:{command}", { await self.hooks.emit(f"command:{command}", {
"platform": source.platform.value if source.platform else "", "platform": source.platform.value if source.platform else "",
@@ -869,6 +899,18 @@ class GatewayRunner:
if command == "rollback": if command == "rollback":
return await self._handle_rollback_command(event) return await self._handle_rollback_command(event)
if command == "agents":
agent_lines = []
for ac in self._agent_registry.list_agents():
marker = ' *' if ac.default else ''
model = ac.model or '(inherited)'
agent_lines.append(f' {ac.id}{marker} {model} {ac.description}')
response = f"📋 Agents:\n" + '\n'.join(agent_lines) + f"\n\n🤖 This chat → {agent_id}"
adapter = self.adapters.get(source.platform)
if adapter:
await adapter.send(source.chat_id, response)
return response
# Skill slash commands: /skill-name loads the skill and sends to agent # Skill slash commands: /skill-name loads the skill and sends to agent
if command: if command:
try: try:
@@ -885,7 +927,7 @@ class GatewayRunner:
logger.debug("Skill command check failed (non-fatal): %s", e) logger.debug("Skill command check failed (non-fatal): %s", e)
# Check for pending exec approval responses # Check for pending exec approval responses
session_key_preview = build_session_key(source) session_key_preview = build_session_key(source, agent_id=agent_id)
if session_key_preview in self._pending_approvals: if session_key_preview in self._pending_approvals:
user_text = event.text.strip().lower() user_text = event.text.strip().lower()
if user_text in ("yes", "y", "approve", "ok", "go", "do it"): if user_text in ("yes", "y", "approve", "ok", "go", "do it"):
@@ -1269,7 +1311,8 @@ class GatewayRunner:
history=history, history=history,
source=source, source=source,
session_id=session_entry.session_id, session_id=session_entry.session_id,
session_key=session_key session_key=session_key,
agent_config=agent_config,
) )
response = agent_result.get("final_response", "") response = agent_result.get("final_response", "")
@@ -2525,7 +2568,8 @@ class GatewayRunner:
history: List[Dict[str, Any]], history: List[Dict[str, Any]],
source: SessionSource, source: SessionSource,
session_id: str, session_id: str,
session_key: str = None session_key: str = None,
agent_config: AgentConfig = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Run the agent with the given message and context. Run the agent with the given message and context.
@@ -2796,6 +2840,9 @@ class GatewayRunner:
combined_ephemeral = context_prompt or "" combined_ephemeral = context_prompt or ""
if self._ephemeral_system_prompt: if self._ephemeral_system_prompt:
combined_ephemeral = (combined_ephemeral + "\n\n" + self._ephemeral_system_prompt).strip() combined_ephemeral = (combined_ephemeral + "\n\n" + self._ephemeral_system_prompt).strip()
# Prepend agent personality if available
if agent_config and agent_config.personality:
combined_ephemeral = (agent_config.personality + "\n\n" + combined_ephemeral).strip()
# Re-read .env and config for fresh credentials (gateway is long-lived, # Re-read .env and config for fresh credentials (gateway is long-lived,
# keys may change without restart). # keys may change without restart).
@@ -2822,6 +2869,10 @@ class GatewayRunner:
except Exception: except Exception:
pass pass
# Agent-specific model override
if agent_config and agent_config.model:
model = agent_config.model
try: try:
runtime_kwargs = _resolve_runtime_agent_kwargs() runtime_kwargs = _resolve_runtime_agent_kwargs()
except Exception as exc: except Exception as exc:
@@ -2856,6 +2907,8 @@ class GatewayRunner:
honcho_session_key=session_key, honcho_session_key=session_key,
session_db=self._session_db, session_db=self._session_db,
fallback_model=self._fallback_model, fallback_model=self._fallback_model,
agent_tool_policy=agent_config.tool_policy if agent_config else None,
agent_workspace=str(agent_config.workspace_dir) if agent_config else None,
) )
# Store agent reference for interrupt support # Store agent reference for interrupt support
@@ -3061,7 +3114,8 @@ class GatewayRunner:
history=updated_history, history=updated_history,
source=source, source=source,
session_id=session_id, session_id=session_id,
session_key=session_key session_key=session_key,
agent_config=agent_config,
) )
finally: finally:
# Stop progress sender and interrupt monitor # Stop progress sender and interrupt monitor

View File

@@ -295,18 +295,26 @@ class SessionEntry:
) )
def build_session_key(source: SessionSource) -> str: def build_session_key(source: SessionSource, agent_id: str = 'main', dm_scope: str = 'main') -> str:
"""Build a deterministic session key from a message source. """Build a deterministic session key from a message source.
This is the single source of truth for session key construction. This is the single source of truth for session key construction.
WhatsApp DMs include chat_id (multi-user), other DMs do not (single owner). WhatsApp DMs include chat_id (multi-user), other DMs do not (single owner).
Args:
source: The session source describing the message origin.
agent_id: Agent identifier for multi-agent setups (default 'main').
dm_scope: DM scoping strategy. 'per_peer' creates separate sessions
per user_id; 'main' (default) uses a single DM session.
""" """
platform = source.platform.value platform = source.platform.value
if source.chat_type == "dm": if source.chat_type == "dm":
if platform == "whatsapp" and source.chat_id: if platform == "whatsapp" and source.chat_id:
return f"agent:main:{platform}:dm:{source.chat_id}" return f"agent:{agent_id}:{platform}:dm:{source.chat_id}"
return f"agent:main:{platform}:dm" if dm_scope == "per_peer" and source.user_id:
return f"agent:main:{platform}:{source.chat_type}:{source.chat_id}" return f"agent:{agent_id}:{platform}:dm:{source.user_id}"
return f"agent:{agent_id}:{platform}:dm"
return f"agent:{agent_id}:{platform}:{source.chat_type}:{source.chat_id}"
class SessionStore: class SessionStore:

View File

@@ -29,6 +29,7 @@ COMMANDS = {
"/undo": "Remove the last user/assistant exchange", "/undo": "Remove the last user/assistant exchange",
"/save": "Save the current conversation", "/save": "Save the current conversation",
"/config": "Show current configuration", "/config": "Show current configuration",
"/agents": "List configured agents and active bindings",
"/cron": "Manage scheduled tasks (list, add, remove)", "/cron": "Manage scheduled tasks (list, add, remove)",
"/skills": "Search, install, inspect, or manage skills from online registries", "/skills": "Search, install, inspect, or manage skills from online registries",
"/platforms": "Show gateway/messaging platform status", "/platforms": "Show gateway/messaging platform status",

View File

@@ -119,6 +119,21 @@ DEFAULT_CONFIG = {
}, },
}, },
# Subagent configuration — model/provider for tasks spawned via delegate_task.
# By default subagents inherit the parent agent's model and provider.
# Set "model" to use a cheaper/faster model for delegated subtasks.
"subagent": {
# "model": "google/gemini-3-flash-preview",
},
# Named agent profiles — reusable bundles of model + toolsets + prompt.
# Define profiles here and activate them with `hermes chat --agent <name>`.
"agents": {},
# Bindings — map trigger patterns or platforms to named agent profiles.
# Example: [{"pattern": "review *", "agent": "code-reviewer"}]
"bindings": [],
"display": { "display": {
"compact": False, "compact": False,
"personality": "kawaii", "personality": "kawaii",
@@ -182,7 +197,7 @@ DEFAULT_CONFIG = {
"command_allowlist": [], "command_allowlist": [],
# Config schema version - bump this when adding new required fields # Config schema version - bump this when adding new required fields
"_config_version": 6, "_config_version": 7,
} }
# ============================================================================= # =============================================================================
@@ -647,6 +662,23 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
tz_display = config["timezone"] or "(server-local)" tz_display = config["timezone"] or "(server-local)"
print(f" ✓ Added timezone to config.yaml: {tz_display}") print(f" ✓ Added timezone to config.yaml: {tz_display}")
# ── Version 6 → 7: add agents and bindings keys ──
if current_ver < 7:
config = load_config()
changed = False
if "agents" not in config:
config["agents"] = {}
results["config_added"].append("agents={}")
changed = True
if "bindings" not in config:
config["bindings"] = []
results["config_added"].append("bindings=[]")
changed = True
if changed:
save_config(config)
if not quiet:
print(" ✓ Added agents and bindings to config.yaml")
if current_ver < latest_ver and not quiet: if current_ver < latest_ver and not quiet:
print(f"Config version: {current_ver}{latest_ver}") print(f"Config version: {current_ver}{latest_ver}")

View File

@@ -495,6 +495,7 @@ def cmd_chat(args):
"resume": getattr(args, "resume", None), "resume": getattr(args, "resume", None),
"worktree": getattr(args, "worktree", False), "worktree": getattr(args, "worktree", False),
"checkpoints": getattr(args, "checkpoints", False), "checkpoints": getattr(args, "checkpoints", False),
"agent": getattr(args, "agent", None),
} }
# Filter out None values # Filter out None values
kwargs = {k: v for k, v in kwargs.items() if v is not None} kwargs = {k: v for k, v in kwargs.items() if v is not None}
@@ -1966,6 +1967,12 @@ For more help on a command:
default=False, default=False,
help="Bypass all dangerous command approval prompts (use at your own risk)" help="Bypass all dangerous command approval prompts (use at your own risk)"
) )
chat_parser.add_argument(
"--agent",
type=str,
default=None,
help="Named agent profile to use from config.yaml"
)
chat_parser.set_defaults(func=cmd_chat) chat_parser.set_defaults(func=cmd_chat)
# ========================================================================= # =========================================================================

View File

@@ -165,6 +165,7 @@ def get_tool_definitions(
enabled_toolsets: List[str] = None, enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None, disabled_toolsets: List[str] = None,
quiet_mode: bool = False, quiet_mode: bool = False,
agent_tool_policy=None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Get tool definitions for model API calls with toolset-based filtering. Get tool definitions for model API calls with toolset-based filtering.
@@ -222,6 +223,10 @@ def get_tool_definitions(
for ts_name in get_all_toolsets(): for ts_name in get_all_toolsets():
tools_to_include.update(resolve_toolset(ts_name)) tools_to_include.update(resolve_toolset(ts_name))
# Apply agent-level tool policy filtering (if provided)
if agent_tool_policy is not None:
tools_to_include = agent_tool_policy.apply(tools_to_include)
# Ask the registry for schemas (only returns tools whose check_fn passes) # Ask the registry for schemas (only returns tools whose check_fn passes)
filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode) filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode)
@@ -294,6 +299,10 @@ def handle_function_call(
except Exception: except Exception:
pass # file_tools may not be loaded yet pass # file_tools may not be loaded yet
# Early check: reject tools not in the enabled set for this agent
if enabled_tools is not None and function_name not in enabled_tools:
return json.dumps({"error": f"Tool '{function_name}' is not available for this agent"})
try: try:
if function_name in _AGENT_LOOP_TOOLS: if function_name in _AGENT_LOOP_TOOLS:
return json.dumps({"error": f"{function_name} must be handled by the agent loop"}) return json.dumps({"error": f"{function_name} must be handled by the agent loop"})

View File

@@ -158,6 +158,7 @@ class AIAgent:
tool_delay: float = 1.0, tool_delay: float = 1.0,
enabled_toolsets: List[str] = None, enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None, disabled_toolsets: List[str] = None,
agent_tool_policy=None,
save_trajectories: bool = False, save_trajectories: bool = False,
verbose_logging: bool = False, verbose_logging: bool = False,
quiet_mode: bool = False, quiet_mode: bool = False,
@@ -180,6 +181,7 @@ class AIAgent:
prefill_messages: List[Dict[str, Any]] = None, prefill_messages: List[Dict[str, Any]] = None,
platform: str = None, platform: str = None,
skip_context_files: bool = False, skip_context_files: bool = False,
agent_workspace: str = None,
skip_memory: bool = False, skip_memory: bool = False,
session_db=None, session_db=None,
honcho_session_key: str = None, honcho_session_key: str = None,
@@ -241,6 +243,8 @@ class AIAgent:
self.ephemeral_system_prompt = ephemeral_system_prompt self.ephemeral_system_prompt = ephemeral_system_prompt
self.platform = platform # "cli", "telegram", "discord", "whatsapp", etc. self.platform = platform # "cli", "telegram", "discord", "whatsapp", etc.
self.skip_context_files = skip_context_files self.skip_context_files = skip_context_files
self._agent_tool_policy = agent_tool_policy
self._agent_workspace = Path(agent_workspace) if agent_workspace else None
self.log_prefix_chars = log_prefix_chars self.log_prefix_chars = log_prefix_chars
self.log_prefix = f"{log_prefix} " if log_prefix else "" self.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store effective base URL for feature detection (prompt caching, reasoning, etc.) # Store effective base URL for feature detection (prompt caching, reasoning, etc.)
@@ -427,6 +431,7 @@ class AIAgent:
enabled_toolsets=enabled_toolsets, enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets, disabled_toolsets=disabled_toolsets,
quiet_mode=self.quiet_mode, quiet_mode=self.quiet_mode,
agent_tool_policy=agent_tool_policy,
) )
# Show tool configuration and store valid tool names for validation # Show tool configuration and store valid tool names for validation

View File

@@ -0,0 +1,774 @@
"""Comprehensive tests for gateway.agent_registry module."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import patch as mock_patch
import pytest
from gateway.agent_registry import (
TOOL_PROFILES,
AgentConfig,
AgentRegistry,
ToolPolicy,
normalize_tool_config,
_validate_agent_id,
HERMES_HOME,
)
# =========================================================================
# 1. TOOL_PROFILES
# =========================================================================
class TestToolProfiles:
"""Verify all 4 tool profiles exist and have correct structure."""
def test_all_four_profiles_exist(self):
assert set(TOOL_PROFILES.keys()) == {"minimal", "coding", "messaging", "full"}
def test_minimal_has_allow_list(self):
profile = TOOL_PROFILES["minimal"]
assert "allow" in profile
assert isinstance(profile["allow"], list)
assert len(profile["allow"]) > 0
def test_coding_has_allow_list(self):
profile = TOOL_PROFILES["coding"]
assert "allow" in profile
assert isinstance(profile["allow"], list)
assert len(profile["allow"]) > 0
def test_messaging_has_allow_list(self):
profile = TOOL_PROFILES["messaging"]
assert "allow" in profile
assert isinstance(profile["allow"], list)
assert len(profile["allow"]) > 0
def test_full_has_no_allow_list(self):
"""The 'full' profile is an empty dict, meaning no restrictions."""
profile = TOOL_PROFILES["full"]
assert profile == {}
assert "allow" not in profile
def test_minimal_contains_expected_tools(self):
tools = TOOL_PROFILES["minimal"]["allow"]
for name in ("clarify", "memory", "todo", "session_search"):
assert name in tools
def test_coding_contains_expected_tools(self):
tools = TOOL_PROFILES["coding"]["allow"]
for name in (
"terminal", "process", "read_file", "write_file", "patch",
"search_files", "web_search", "web_extract", "memory",
"delegate_task", "execute_code",
):
assert name in tools
def test_messaging_contains_expected_tools(self):
tools = TOOL_PROFILES["messaging"]["allow"]
for name in (
"web_search", "web_extract", "memory", "send_message",
"text_to_speech", "image_generate",
):
assert name in tools
def test_each_profile_value_is_dict(self):
for name, profile in TOOL_PROFILES.items():
assert isinstance(profile, dict), f"Profile {name!r} is not a dict"
# =========================================================================
# 2. normalize_tool_config
# =========================================================================
class TestNormalizeToolConfig:
"""Test coercion of shorthand forms into ToolPolicy."""
def test_none_returns_none(self):
assert normalize_tool_config(None) is None
def test_string_returns_profile(self):
policy = normalize_tool_config("coding")
assert isinstance(policy, ToolPolicy)
assert policy.profile == "coding"
assert policy.allow is None
assert policy.also_allow is None
assert policy.deny is None
def test_list_returns_allow_policy(self):
names = ["read_file", "write_file"]
policy = normalize_tool_config(names)
assert isinstance(policy, ToolPolicy)
assert policy.allow == names
assert policy.profile is None
assert policy.also_allow is None
assert policy.deny is None
def test_dict_returns_full_policy(self):
raw = {
"profile": "minimal",
"also_allow": ["terminal"],
"deny": ["clarify"],
}
policy = normalize_tool_config(raw)
assert isinstance(policy, ToolPolicy)
assert policy.profile == "minimal"
assert policy.also_allow == ["terminal"]
assert policy.deny == ["clarify"]
assert policy.allow is None
def test_dict_with_allow(self):
raw = {"allow": ["read_file", "write_file"]}
policy = normalize_tool_config(raw)
assert policy.allow == ["read_file", "write_file"]
assert policy.profile is None
def test_dict_empty(self):
policy = normalize_tool_config({})
assert isinstance(policy, ToolPolicy)
assert policy.profile is None
assert policy.allow is None
assert policy.also_allow is None
assert policy.deny is None
def test_invalid_type_raises_type_error(self):
with pytest.raises(TypeError, match="Invalid tool_policy value"):
normalize_tool_config(42)
def test_invalid_type_bool_raises(self):
with pytest.raises(TypeError):
normalize_tool_config(True)
# =========================================================================
# 3. ToolPolicy.apply()
# =========================================================================
class TestToolPolicyApply:
"""Test the tool filtering pipeline: profile -> also_allow -> allow -> deny."""
ALL_TOOLS = {
"terminal", "process", "read_file", "write_file", "patch",
"search_files", "web_search", "web_extract", "memory", "todo",
"clarify", "session_search", "delegate_task", "execute_code",
"vision_analyze", "send_message", "text_to_speech", "image_generate",
}
def test_none_policy_passes_all(self):
"""A default ToolPolicy (no profile, no allow/deny) lets everything through."""
policy = ToolPolicy()
result = policy.apply(self.ALL_TOOLS)
assert result == self.ALL_TOOLS
def test_profile_filtering(self):
"""A profile restricts to only the tools in that profile's allow list."""
policy = ToolPolicy(profile="minimal")
result = policy.apply(self.ALL_TOOLS)
expected = set(TOOL_PROFILES["minimal"]["allow"])
assert result == expected
def test_full_profile_has_no_restrictions(self):
"""The 'full' profile passes all tools through."""
policy = ToolPolicy(profile="full")
result = policy.apply(self.ALL_TOOLS)
assert result == self.ALL_TOOLS
def test_also_allow_adds_to_profile(self):
"""also_allow adds tools to the profile's base set."""
policy = ToolPolicy(profile="minimal", also_allow=["terminal", "web_search"])
result = policy.apply(self.ALL_TOOLS)
expected = set(TOOL_PROFILES["minimal"]["allow"]) | {"terminal", "web_search"}
assert result == expected
def test_also_allow_only_adds_available_tools(self):
"""also_allow only adds tools that exist in the available set."""
policy = ToolPolicy(profile="minimal", also_allow=["nonexistent_tool"])
result = policy.apply(self.ALL_TOOLS)
expected = set(TOOL_PROFILES["minimal"]["allow"])
assert result == expected
assert "nonexistent_tool" not in result
def test_allow_whitelist(self):
"""An explicit allow list narrows results to only those tools."""
policy = ToolPolicy(allow=["terminal", "read_file", "write_file"])
result = policy.apply(self.ALL_TOOLS)
assert result == {"terminal", "read_file", "write_file"}
def test_allow_whitelist_with_unavailable_tool(self):
"""Allow list can only select tools that are in the available set."""
policy = ToolPolicy(allow=["terminal", "nonexistent"])
result = policy.apply(self.ALL_TOOLS)
assert result == {"terminal"}
def test_deny_blacklist(self):
"""Denied tools are removed from the result."""
policy = ToolPolicy(deny=["terminal", "process"])
result = policy.apply(self.ALL_TOOLS)
assert "terminal" not in result
assert "process" not in result
# Everything else is still there
assert result == self.ALL_TOOLS - {"terminal", "process"}
def test_deny_wins_over_allow(self):
"""If a tool is in both allow and deny, deny wins."""
policy = ToolPolicy(allow=["terminal", "read_file"], deny=["terminal"])
result = policy.apply(self.ALL_TOOLS)
assert result == {"read_file"}
assert "terminal" not in result
def test_deny_wins_over_also_allow(self):
"""Deny also beats also_allow."""
policy = ToolPolicy(
profile="minimal",
also_allow=["terminal"],
deny=["terminal"],
)
result = policy.apply(self.ALL_TOOLS)
assert "terminal" not in result
def test_profile_plus_allow_intersection(self):
"""Profile + allow narrows to intersection of profile and allow."""
policy = ToolPolicy(profile="coding", allow=["terminal", "read_file", "send_message"])
result = policy.apply(self.ALL_TOOLS)
# send_message is not in coding profile, so it's excluded by the profile first
# terminal and read_file are in coding, then intersected with allow
assert result == {"terminal", "read_file"}
def test_full_pipeline(self):
"""Full pipeline: profile -> also_allow -> allow -> deny."""
policy = ToolPolicy(
profile="minimal",
also_allow=["terminal", "read_file"],
allow=["clarify", "memory", "terminal"],
deny=["memory"],
)
result = policy.apply(self.ALL_TOOLS)
# Step 1: minimal profile -> {clarify, memory, todo, session_search}
# Step 2: also_allow terminal, read_file -> + {terminal, read_file}
# Step 3: allow intersect {clarify, memory, terminal} -> {clarify, memory, terminal}
# Step 4: deny memory -> {clarify, terminal}
assert result == {"clarify", "terminal"}
def test_empty_tools_set(self):
"""Applying policy to an empty set always returns empty."""
policy = ToolPolicy(profile="coding")
result = policy.apply(set())
assert result == set()
def test_unknown_profile_treated_as_no_profile(self):
"""An unknown profile name falls through to all tools."""
policy = ToolPolicy(profile="nonexistent_profile")
result = policy.apply(self.ALL_TOOLS)
assert result == self.ALL_TOOLS
# =========================================================================
# 4. AgentConfig
# =========================================================================
class TestAgentConfig:
"""Test AgentConfig defaults and derived properties."""
def test_default_values(self):
cfg = AgentConfig(id="test")
assert cfg.id == "test"
assert cfg.description == ""
assert cfg.default is False
assert cfg.model is None
assert cfg.provider is None
assert cfg.personality is None
assert cfg.workspace is None
assert cfg.toolsets is None
assert cfg.tool_policy is None
assert cfg.reasoning is None
assert cfg.max_turns is None
assert cfg.sandbox is None
assert cfg.fallback_model is None
assert cfg.memory_enabled is True
assert cfg.dm_scope == "main"
def test_workspace_dir_default(self):
"""Without custom workspace, uses ~/.hermes/agents/<id>."""
cfg = AgentConfig(id="myagent")
expected = HERMES_HOME / "agents" / "myagent"
assert cfg.workspace_dir == expected
def test_workspace_dir_custom(self):
"""Custom workspace path is used directly."""
cfg = AgentConfig(id="myagent", workspace="/tmp/custom_workspace")
assert cfg.workspace_dir == Path("/tmp/custom_workspace")
def test_workspace_dir_tilde_expansion(self):
"""Custom workspace with ~ is expanded."""
cfg = AgentConfig(id="myagent", workspace="~/my_workspace")
assert cfg.workspace_dir == Path.home() / "my_workspace"
def test_sessions_dir(self):
"""sessions_dir is workspace_dir / 'sessions'."""
cfg = AgentConfig(id="myagent")
assert cfg.sessions_dir == cfg.workspace_dir / "sessions"
def test_sessions_dir_custom_workspace(self):
cfg = AgentConfig(id="myagent", workspace="/tmp/ws")
assert cfg.sessions_dir == Path("/tmp/ws/sessions")
def test_custom_field_values(self):
cfg = AgentConfig(
id="coder",
description="A coding assistant",
default=True,
model="claude-3-opus",
provider="anthropic",
personality="You are a coder.",
memory_enabled=False,
max_turns=10,
dm_scope="all",
)
assert cfg.description == "A coding assistant"
assert cfg.default is True
assert cfg.model == "claude-3-opus"
assert cfg.provider == "anthropic"
assert cfg.personality == "You are a coder."
assert cfg.memory_enabled is False
assert cfg.max_turns == 10
assert cfg.dm_scope == "all"
# =========================================================================
# 5. AgentRegistry
# =========================================================================
class TestAgentRegistryImplicitMain:
"""When no 'agents' key in config, an implicit main agent is created."""
def test_implicit_main_agent(self):
registry = AgentRegistry(config={})
agents = registry.list_agents()
assert len(agents) == 1
assert agents[0].id == "main"
assert agents[0].default is True
def test_implicit_main_inherits_global_config(self):
gc = {
"model": "claude-3-sonnet",
"provider": "anthropic",
"personality": "Be helpful.",
"max_turns": 15,
"memory_enabled": False,
}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.model == "claude-3-sonnet"
assert main.provider == "anthropic"
assert main.personality == "Be helpful."
assert main.max_turns == 15
assert main.memory_enabled is False
def test_implicit_main_with_tool_config(self):
gc = {"tools": "coding"}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.tool_policy is not None
assert main.tool_policy.profile == "coding"
def test_get_default_returns_main(self):
registry = AgentRegistry(config={})
default = registry.get_default()
assert default.id == "main"
class TestAgentRegistryMultipleAgents:
"""Test registry with multiple agent definitions."""
def make_registry(self, agents_dict, global_config=None):
return AgentRegistry(
config={"agents": agents_dict},
global_config=global_config,
)
def test_multiple_agents_from_dict(self):
registry = self.make_registry({
"coder": {"description": "Writes code"},
"reviewer": {"description": "Reviews code"},
})
agents = registry.list_agents()
assert len(agents) == 2
ids = {a.id for a in agents}
assert ids == {"coder", "reviewer"}
def test_explicit_default(self):
registry = self.make_registry({
"alpha": {"default": False},
"beta": {"default": True},
"gamma": {},
})
default = registry.get_default()
assert default.id == "beta"
def test_first_in_dict_fallback_default(self):
"""When no agent is explicitly default, the first one is used."""
registry = self.make_registry({
"first": {"description": "First agent"},
"second": {"description": "Second agent"},
})
default = registry.get_default()
assert default.id == "first"
assert default.default is True
def test_get_returns_default_for_unknown_id(self):
registry = self.make_registry({
"alpha": {"default": True},
"beta": {},
})
result = registry.get("nonexistent")
assert result.id == "alpha"
def test_get_returns_correct_agent(self):
registry = self.make_registry({
"alpha": {"default": True, "description": "Alpha"},
"beta": {"description": "Beta"},
})
alpha = registry.get("alpha")
beta = registry.get("beta")
assert alpha.id == "alpha"
assert alpha.description == "Alpha"
assert beta.id == "beta"
assert beta.description == "Beta"
def test_list_agents_returns_all(self):
registry = self.make_registry({
"a": {},
"b": {},
"c": {},
})
agents = registry.list_agents()
assert len(agents) == 3
assert {a.id for a in agents} == {"a", "b", "c"}
def test_agent_data_none_treated_as_empty(self):
"""Agent value of None in config is treated as empty dict."""
registry = self.make_registry({
"simple": None,
})
agent = registry.get("simple")
assert agent.id == "simple"
def test_custom_id_override(self):
"""Agent can have an 'id' field different from the dict key."""
registry = self.make_registry({
"name_in_dict": {"id": "custom-id", "description": "Custom ID"},
})
agent = registry.get("custom-id")
assert agent.id == "custom-id"
assert agent.description == "Custom ID"
def test_multiple_defaults_raises(self):
with pytest.raises(ValueError, match="Multiple default agents"):
self.make_registry({
"a": {"default": True},
"b": {"default": True},
})
def test_duplicate_ids_via_custom_id_raises(self):
"""Two agents resolving to the same ID via custom 'id' field."""
with pytest.raises(ValueError, match="Duplicate agent id"):
self.make_registry({
"alpha": {"id": "shared"},
"beta": {"id": "shared"},
})
class TestAgentRegistryResolvePersonality:
"""Test resolve_personality resolution order."""
def test_inline_personality_text(self):
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", personality="Be helpful and kind.")
result = registry.resolve_personality(agent)
assert result == "Be helpful and kind."
def test_personality_from_file(self, tmp_path):
soul_file = tmp_path / "personality.md"
soul_file.write_text(" I am a specialized assistant. ")
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", personality=str(soul_file))
result = registry.resolve_personality(agent)
assert result == "I am a specialized assistant."
def test_personality_from_workspace_soul_md(self, tmp_path):
workspace = tmp_path / "agents" / "test"
workspace.mkdir(parents=True)
soul_file = workspace / "SOUL.md"
soul_file.write_text(" Workspace soul content. ")
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", workspace=str(workspace))
result = registry.resolve_personality(agent)
assert result == "Workspace soul content."
def test_personality_config_takes_precedence_over_soul_md(self, tmp_path):
"""Explicit personality in config wins over SOUL.md file."""
workspace = tmp_path / "agents" / "test"
workspace.mkdir(parents=True)
soul_file = workspace / "SOUL.md"
soul_file.write_text("Workspace soul.")
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", workspace=str(workspace), personality="Inline wins.")
result = registry.resolve_personality(agent)
assert result == "Inline wins."
def test_main_agent_global_soul_md(self, tmp_path):
"""Main agent falls back to global ~/.hermes/SOUL.md."""
global_soul = tmp_path / "SOUL.md"
global_soul.write_text(" Global soul personality. ")
registry = AgentRegistry(config={})
agent = AgentConfig(id="main")
with mock_patch("gateway.agent_registry.HERMES_HOME", tmp_path):
# Also need to mock workspace_dir to avoid matching workspace SOUL.md
with mock_patch.object(
AgentConfig, "workspace_dir", new_callable=lambda: property(
lambda self: tmp_path / "nonexistent_workspace"
)
):
result = registry.resolve_personality(agent)
assert result == "Global soul personality."
def test_non_main_agent_no_global_fallback(self, tmp_path):
"""Non-main agents do NOT fall back to global SOUL.md."""
global_soul = tmp_path / "SOUL.md"
global_soul.write_text("Global soul.")
registry = AgentRegistry(config={})
agent = AgentConfig(id="helper")
with mock_patch("gateway.agent_registry.HERMES_HOME", tmp_path):
with mock_patch.object(
AgentConfig, "workspace_dir", new_callable=lambda: property(
lambda self: tmp_path / "nonexistent_workspace"
)
):
result = registry.resolve_personality(agent)
assert result is None
def test_personality_none_when_nothing_found(self):
"""Returns None when no personality is configured and no SOUL.md exists."""
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", workspace="/tmp/definitely_nonexistent_workspace_xyz")
result = registry.resolve_personality(agent)
assert result is None
class TestAgentRegistryResolveToolsets:
"""Test resolve_toolsets method."""
def test_agent_explicit_toolsets_returned(self):
registry = AgentRegistry(config={})
agent = AgentConfig(id="test", toolsets=["core", "web", "coding"])
result = registry.resolve_toolsets(agent, platform="local")
assert result == ["core", "web", "coding"]
def test_agent_no_toolsets_returns_none(self):
registry = AgentRegistry(config={})
agent = AgentConfig(id="test")
result = registry.resolve_toolsets(agent, platform="local")
assert result is None
def test_returns_copy_not_reference(self):
registry = AgentRegistry(config={})
toolsets = ["core", "web"]
agent = AgentConfig(id="test", toolsets=toolsets)
result = registry.resolve_toolsets(agent, platform="local")
assert result == toolsets
assert result is not toolsets # Should be a copy
# =========================================================================
# 6. Validation
# =========================================================================
class TestValidation:
"""Test agent ID validation."""
def test_valid_ids(self):
for valid_id in ("main", "coder", "my-agent", "agent_1", "a", "0test", "a-b_c"):
_validate_agent_id(valid_id) # Should not raise
def test_invalid_id_uppercase(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("MyAgent")
def test_invalid_id_starts_with_underscore(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("_agent")
def test_invalid_id_starts_with_hyphen(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("-agent")
def test_invalid_id_empty_string(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("")
def test_invalid_id_spaces(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("my agent")
def test_invalid_id_special_chars(self):
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("agent@home")
def test_invalid_id_too_long(self):
"""IDs over 64 characters are rejected."""
with pytest.raises(ValueError, match="Invalid agent id"):
_validate_agent_id("a" * 65)
def test_valid_id_max_length(self):
"""64 characters is the max valid length."""
_validate_agent_id("a" * 64)
def test_registry_rejects_invalid_agent_id(self):
with pytest.raises(ValueError, match="Invalid agent id"):
AgentRegistry(config={"agents": {"My Agent!": {"description": "bad"}}})
# =========================================================================
# 7. Inheritance from global_config
# =========================================================================
class TestGlobalConfigInheritance:
"""Test that agents inherit from global_config where appropriate."""
def test_implicit_main_inherits_model(self):
gc = {"model": "gpt-4o"}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.model == "gpt-4o"
def test_implicit_main_inherits_provider(self):
gc = {"provider": "openai"}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.provider == "openai"
def test_implicit_main_inherits_reasoning(self):
gc = {"reasoning": {"budget_tokens": 5000}}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.reasoning == {"budget_tokens": 5000}
def test_explicit_agent_model_not_inherited_from_global(self):
"""Agents defined in agents section do NOT auto-inherit global model.
(The registry only applies global inheritance for the implicit main agent.)
"""
registry = AgentRegistry(
config={"agents": {"coder": {"description": "A coder"}}},
global_config={"model": "gpt-4o"},
)
coder = registry.get("coder")
assert coder.model is None # Not inherited from global
def test_explicit_agent_with_own_model(self):
registry = AgentRegistry(
config={"agents": {"coder": {"model": "claude-3-opus"}}},
global_config={"model": "gpt-4o"},
)
coder = registry.get("coder")
assert coder.model == "claude-3-opus"
def test_no_global_config_means_none_fields(self):
registry = AgentRegistry(config={})
main = registry.get("main")
assert main.model is None
assert main.provider is None
assert main.personality is None
assert main.reasoning is None
assert main.max_turns is None
def test_implicit_main_tool_policy_from_global(self):
gc = {"tools": ["read_file", "write_file"]}
registry = AgentRegistry(config={}, global_config=gc)
main = registry.get("main")
assert main.tool_policy is not None
assert main.tool_policy.allow == ["read_file", "write_file"]
# =========================================================================
# Edge cases & integration-style tests
# =========================================================================
class TestEdgeCases:
"""Additional edge case and integration tests."""
def test_agent_with_tool_policy_string_in_agents_section(self):
registry = AgentRegistry(config={
"agents": {
"coder": {"tools": "coding"},
},
})
agent = registry.get("coder")
assert agent.tool_policy is not None
assert agent.tool_policy.profile == "coding"
def test_agent_with_tool_policy_dict_in_agents_section(self):
registry = AgentRegistry(config={
"agents": {
"coder": {
"tools": {
"profile": "minimal",
"also_allow": ["terminal"],
"deny": ["clarify"],
},
},
},
})
agent = registry.get("coder")
assert agent.tool_policy.profile == "minimal"
assert agent.tool_policy.also_allow == ["terminal"]
assert agent.tool_policy.deny == ["clarify"]
def test_agent_with_subagent_policy(self):
registry = AgentRegistry(config={
"agents": {
"orchestrator": {
"subagents": {"max_depth": 3, "max_children": 10},
},
},
})
agent = registry.get("orchestrator")
assert agent.subagents.max_depth == 3
assert agent.subagents.max_children == 10
def test_agent_default_subagent_policy(self):
registry = AgentRegistry(config={
"agents": {"simple": {}},
})
agent = registry.get("simple")
assert agent.subagents.max_depth == 2
assert agent.subagents.max_children == 5
def test_single_agent_in_agents_section_becomes_default(self):
registry = AgentRegistry(config={
"agents": {"solo": {"description": "Only agent"}},
})
default = registry.get_default()
assert default.id == "solo"
assert default.default is True
def test_empty_agents_dict_treated_as_no_agents(self):
"""An empty agents dict is falsy, so implicit main is created."""
registry = AgentRegistry(config={"agents": {}})
agents = registry.list_agents()
assert len(agents) == 1
assert agents[0].id == "main"
def test_tool_policy_field_alias(self):
"""tool_policy key also works (in addition to 'tools')."""
registry = AgentRegistry(config={
"agents": {
"coder": {"tool_policy": "coding"},
},
})
agent = registry.get("coder")
assert agent.tool_policy is not None
assert agent.tool_policy.profile == "coding"

View File

@@ -0,0 +1,245 @@
"""Tests for multi-agent integration with existing components.
Covers:
1. Session key namespacing via build_session_key
2. ToolPolicy filtering via get_tool_definitions(agent_tool_policy=...)
3. MemoryStore with custom memory_dir
4. DEFAULT_CONFIG shape (agents, bindings, _config_version)
5. /agents command presence in COMMANDS dict
"""
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from gateway.config import Platform
from gateway.session import SessionSource, build_session_key
from gateway.agent_registry import ToolPolicy
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_source(platform=Platform.TELEGRAM, chat_id="12345",
chat_type="dm", user_id=None):
return SessionSource(
platform=platform,
chat_id=chat_id,
chat_type=chat_type,
user_id=user_id,
)
# ===================================================================
# 1. Session key namespacing
# ===================================================================
class TestBuildSessionKeyNamespacing:
"""build_session_key must produce distinct keys for different agent_ids."""
def test_different_agent_ids_produce_different_keys(self):
source = _make_source()
key_main = build_session_key(source, agent_id="main")
key_helper = build_session_key(source, agent_id="helper")
assert key_main != key_helper
assert "agent:main:" in key_main
assert "agent:helper:" in key_helper
def test_backward_compat_main_agent(self):
"""agent_id='main' (the default) produces 'agent:main:<platform>:dm'."""
source = _make_source(platform=Platform.TELEGRAM)
key = build_session_key(source) # defaults to agent_id='main'
assert key == "agent:main:telegram:dm"
def test_backward_compat_main_group(self):
source = _make_source(platform=Platform.DISCORD, chat_type="group",
chat_id="guild-abc")
key = build_session_key(source)
assert key == "agent:main:discord:group:guild-abc"
def test_agent_id_embedded_in_group_key(self):
source = _make_source(platform=Platform.DISCORD, chat_type="group",
chat_id="guild-abc")
key = build_session_key(source, agent_id="code-review")
assert key == "agent:code-review:discord:group:guild-abc"
def test_dm_scope_per_peer_includes_user_id(self):
source = _make_source(user_id="user-42")
key = build_session_key(source, dm_scope="per_peer")
assert "user-42" in key
assert key == "agent:main:telegram:dm:user-42"
def test_dm_scope_per_peer_no_user_id_falls_back(self):
"""When user_id is None, per_peer falls back to the plain DM key."""
source = _make_source(user_id=None)
key = build_session_key(source, dm_scope="per_peer")
assert key == "agent:main:telegram:dm"
def test_dm_scope_default_ignores_user_id(self):
"""Default dm_scope='main' does NOT include user_id."""
source = _make_source(user_id="user-42")
key = build_session_key(source, dm_scope="main")
assert "user-42" not in key
def test_whatsapp_dm_includes_chat_id(self):
"""WhatsApp DMs always include chat_id (multi-user device)."""
source = _make_source(platform=Platform.WHATSAPP, chat_id="wa-phone")
key = build_session_key(source, agent_id="main")
assert key == "agent:main:whatsapp:dm:wa-phone"
# ===================================================================
# 2. ToolPolicy filtering in get_tool_definitions
# ===================================================================
class TestToolPolicyFiltering:
"""get_tool_definitions should honour agent_tool_policy when provided."""
@staticmethod
def _make_tool_def(name):
return {
"type": "function",
"function": {
"name": name,
"description": f"Stub for {name}",
"parameters": {"type": "object", "properties": {}},
},
}
def test_deny_removes_tool(self):
"""A ToolPolicy with deny=['terminal'] should remove terminal."""
policy = ToolPolicy(deny=["terminal"])
all_tools = {"terminal", "memory", "web_search"}
result = policy.apply(all_tools)
assert "terminal" not in result
assert "memory" in result
assert "web_search" in result
def test_allow_restricts_to_listed(self):
policy = ToolPolicy(allow=["memory", "web_search"])
all_tools = {"terminal", "memory", "web_search", "read_file"}
result = policy.apply(all_tools)
assert result == {"memory", "web_search"}
def test_profile_minimal(self):
"""The 'minimal' profile only keeps its allow list."""
policy = ToolPolicy(profile="minimal")
all_tools = {"terminal", "memory", "clarify", "todo", "session_search",
"web_search", "read_file"}
result = policy.apply(all_tools)
assert result == {"memory", "clarify", "todo", "session_search"}
def test_deny_overrides_allow(self):
"""Deny always wins, even if the tool is in the allow list."""
policy = ToolPolicy(allow=["memory", "terminal"], deny=["terminal"])
all_tools = {"terminal", "memory", "web_search"}
result = policy.apply(all_tools)
assert result == {"memory"}
@patch("model_tools.registry")
@patch("model_tools.resolve_toolset")
@patch("model_tools.validate_toolset", return_value=True)
def test_get_tool_definitions_applies_policy(self, mock_validate,
mock_resolve, mock_reg):
"""End-to-end: get_tool_definitions respects agent_tool_policy."""
from model_tools import get_tool_definitions
mock_resolve.return_value = ["terminal", "memory", "web_search"]
mock_reg.get_definitions.return_value = [
self._make_tool_def("memory"),
self._make_tool_def("web_search"),
]
policy = ToolPolicy(deny=["terminal"])
tools = get_tool_definitions(
enabled_toolsets=["hermes-cli"],
quiet_mode=True,
agent_tool_policy=policy,
)
# registry.get_definitions should have been called with a set
# that does NOT contain 'terminal'
called_tools = mock_reg.get_definitions.call_args[0][0]
assert "terminal" not in called_tools
assert "memory" in called_tools
# ===================================================================
# 3. MemoryStore with custom memory_dir
# ===================================================================
class TestMemoryStoreCustomDir:
"""MemoryStore should use a custom memory_dir when provided."""
def test_custom_dir_is_used(self, tmp_path):
custom = tmp_path / "custom_memories"
# MemoryStore.__init__ creates the directory
from tools.memory_tool import MemoryStore
store = MemoryStore(memory_dir=custom)
assert store._memory_dir == custom
assert custom.exists()
def test_default_dir_is_global(self):
"""Without memory_dir, the store falls back to MEMORY_DIR."""
from tools.memory_tool import MemoryStore, MEMORY_DIR
with patch.object(Path, "mkdir"): # avoid touching real FS
store = MemoryStore()
assert store._memory_dir == MEMORY_DIR
def test_load_and_save_use_custom_dir(self, tmp_path):
custom = tmp_path / "mem"
from tools.memory_tool import MemoryStore
store = MemoryStore(memory_dir=custom)
store.load_from_disk() # should not raise
assert (custom).exists()
# Save should write to custom dir
store.memory_entries = ["fact one"]
store.save_to_disk("memory")
assert (custom / "MEMORY.md").exists()
# ===================================================================
# 4. Config shape
# ===================================================================
class TestDefaultConfigShape:
"""DEFAULT_CONFIG must contain multi-agent keys."""
def test_agents_key_exists(self):
from hermes_cli.config import DEFAULT_CONFIG
assert "agents" in DEFAULT_CONFIG
def test_bindings_key_exists(self):
from hermes_cli.config import DEFAULT_CONFIG
assert "bindings" in DEFAULT_CONFIG
def test_agents_default_is_empty_dict(self):
from hermes_cli.config import DEFAULT_CONFIG
assert DEFAULT_CONFIG["agents"] == {}
def test_bindings_default_is_empty_list(self):
from hermes_cli.config import DEFAULT_CONFIG
assert DEFAULT_CONFIG["bindings"] == []
def test_config_version_is_7(self):
from hermes_cli.config import DEFAULT_CONFIG
assert DEFAULT_CONFIG["_config_version"] == 7
# ===================================================================
# 5. /agents command in COMMANDS dict
# ===================================================================
class TestAgentsCommand:
"""/agents must be registered in the COMMANDS dict."""
def test_agents_in_commands(self):
from hermes_cli.commands import COMMANDS
assert "/agents" in COMMANDS
def test_agents_has_description(self):
from hermes_cli.commands import COMMANDS
desc = COMMANDS["/agents"]
assert isinstance(desc, str)
assert len(desc) > 0

View File

@@ -0,0 +1,546 @@
"""Comprehensive tests for gateway.router module.
Tests cover:
- normalize_binding: string values, wildcard '*', dict values with key expansion
- _assign_tier: all seven tier levels
- BindingRouter.resolve: matching logic, tier ordering, AND semantics, defaults
- Edge cases: empty bindings, unknown platforms
"""
from __future__ import annotations
import pytest
from gateway.router import Binding, BindingRouter, normalize_binding, _assign_tier
# ═══════════════════════════════════════════════════════════════════════
# normalize_binding
# ═══════════════════════════════════════════════════════════════════════
class TestNormalizeBinding:
"""Tests for the normalize_binding helper."""
# ── platform string value (specific chat_id) ─────────────────────
def test_platform_string_sets_chat_id(self):
b = normalize_binding({"agent": "coder", "telegram": "-100123"})
assert b.agent_id == "coder"
assert b.match == {"platform": "telegram", "chat_id": "-100123"}
def test_platform_string_discord(self):
b = normalize_binding({"agent": "bot", "discord": "999"})
assert b.match == {"platform": "discord", "chat_id": "999"}
def test_platform_string_slack(self):
b = normalize_binding({"agent": "helper", "slack": "C01234"})
assert b.match == {"platform": "slack", "chat_id": "C01234"}
# ── platform wildcard '*' ────────────────────────────────────────
def test_platform_wildcard_sets_platform_only(self):
b = normalize_binding({"agent": "assistant", "whatsapp": "*"})
assert b.agent_id == "assistant"
assert b.match == {"platform": "whatsapp"}
def test_wildcard_has_tier_6(self):
b = normalize_binding({"agent": "a", "telegram": "*"})
assert b.tier == 6
# ── platform dict value with key expansion ───────────────────────
def test_dict_guild_expansion(self):
b = normalize_binding({"agent": "a", "discord": {"guild": "123"}})
assert b.match["guild_id"] == "123"
assert "guild" not in b.match
def test_dict_type_expansion(self):
b = normalize_binding({"agent": "a", "discord": {"type": "channel"}})
assert b.match["chat_type"] == "channel"
assert "type" not in b.match
def test_dict_team_expansion(self):
b = normalize_binding({"agent": "a", "slack": {"team": "T999"}})
assert b.match["team_id"] == "T999"
assert "team" not in b.match
def test_dict_peer_expansion(self):
b = normalize_binding({"agent": "a", "telegram": {"peer": "user42"}})
assert b.match["peer"] == "user42"
def test_dict_multiple_expansions(self):
b = normalize_binding({
"agent": "coder",
"discord": {"guild": "123", "type": "channel"},
})
assert b.match == {
"platform": "discord",
"guild_id": "123",
"chat_type": "channel",
}
def test_dict_values_stringified(self):
b = normalize_binding({"agent": "a", "discord": {"guild": 123}})
assert b.match["guild_id"] == "123"
def test_dict_passthrough_expanded_keys(self):
"""Keys already in expanded form are passed through as-is."""
b = normalize_binding({"agent": "a", "discord": {"guild_id": "555"}})
assert b.match["guild_id"] == "555"
# ── agent key variants ───────────────────────────────────────────
def test_agent_id_key_variant(self):
b = normalize_binding({"agent_id": "x", "telegram": "*"})
assert b.agent_id == "x"
def test_missing_agent_raises(self):
with pytest.raises(ValueError, match="missing 'agent'"):
normalize_binding({"telegram": "*"})
# ── unsupported value type ───────────────────────────────────────
def test_unsupported_value_type_raises(self):
with pytest.raises(TypeError, match="Unsupported value type"):
normalize_binding({"agent": "a", "telegram": 42})
# ── no platform key → empty match ────────────────────────────────
def test_no_platform_key_gives_empty_match(self):
b = normalize_binding({"agent": "fallback"})
assert b.match == {}
assert b.tier == 7
# ── only first platform key is used ──────────────────────────────
def test_only_one_platform_used(self):
"""Even if multiple platform keys exist, only one is consumed."""
b = normalize_binding({"agent": "a", "telegram": "*", "discord": "*"})
# We can't predict which one wins (set iteration order), but the
# match should contain exactly one platform key.
assert "platform" in b.match
assert b.match["platform"] in {"telegram", "discord"}
# ═══════════════════════════════════════════════════════════════════════
# _assign_tier
# ═══════════════════════════════════════════════════════════════════════
class TestAssignTier:
"""Tests for _assign_tier: all 7 tier levels."""
def test_tier_1_platform_chat_id(self):
assert _assign_tier({"platform": "telegram", "chat_id": "-100"}) == 1
def test_tier_1_chat_id_without_platform(self):
"""chat_id alone still gets tier 1 (it's the key presence that matters)."""
assert _assign_tier({"chat_id": "-100"}) == 1
def test_tier_2_platform_peer(self):
assert _assign_tier({"platform": "telegram", "peer": "user42"}) == 2
def test_tier_3_platform_guild_chat_type(self):
assert _assign_tier({
"platform": "discord",
"guild_id": "123",
"chat_type": "channel",
}) == 3
def test_tier_4_platform_guild_id(self):
assert _assign_tier({"platform": "discord", "guild_id": "123"}) == 4
def test_tier_4_platform_team_id(self):
assert _assign_tier({"platform": "slack", "team_id": "T01"}) == 4
def test_tier_5_platform_chat_type(self):
assert _assign_tier({"platform": "telegram", "chat_type": "group"}) == 5
def test_tier_6_platform_only(self):
assert _assign_tier({"platform": "telegram"}) == 6
def test_tier_7_empty(self):
assert _assign_tier({}) == 7
# ── precedence checks ────────────────────────────────────────────
def test_chat_id_beats_peer(self):
"""If both chat_id and peer are present, tier 1 wins."""
assert _assign_tier({
"platform": "telegram",
"chat_id": "123",
"peer": "user42",
}) == 1
def test_peer_beats_guild(self):
assert _assign_tier({
"platform": "discord",
"peer": "user42",
"guild_id": "123",
}) == 2
def test_guild_plus_chat_type_beats_guild_alone(self):
tier_combined = _assign_tier({
"platform": "discord",
"guild_id": "123",
"chat_type": "channel",
})
tier_guild_only = _assign_tier({
"platform": "discord",
"guild_id": "123",
})
assert tier_combined < tier_guild_only # lower = more specific
# ═══════════════════════════════════════════════════════════════════════
# BindingRouter.resolve
# ═══════════════════════════════════════════════════════════════════════
class TestBindingRouterResolve:
"""Tests for BindingRouter.resolve method."""
# ── exact chat_id match (tier 1) ─────────────────────────────────
def test_exact_chat_id_match(self):
router = BindingRouter(
[{"agent": "coder", "telegram": "-100123"}],
default_agent_id="default",
)
result = router.resolve(platform="telegram", chat_id="-100123")
assert result == "coder"
def test_chat_id_no_match_falls_to_default(self):
router = BindingRouter(
[{"agent": "coder", "telegram": "-100123"}],
default_agent_id="default",
)
result = router.resolve(platform="telegram", chat_id="-999")
assert result == "default"
# ── peer match (tier 2) ──────────────────────────────────────────
def test_peer_match(self):
router = BindingRouter(
[{"agent": "dm_bot", "telegram": {"peer": "user42"}}],
default_agent_id="default",
)
# resolve doesn't have a peer kwarg, so peer should be in match
# but resolve takes user_id, not peer. Let me check the match logic.
# Actually looking at the code, resolve() kwargs don't include 'peer',
# so a peer binding can never match via resolve() directly unless
# peer is mapped to some kwarg. Let me re-check...
# The _matches method checks binding.match keys against kwargs.
# kwargs has: platform, chat_id, chat_type, user_id, guild_id, team_id
# So 'peer' in binding.match won't match any kwarg → never matches.
# This seems like a design issue, but let's test the actual behavior.
result = router.resolve(platform="telegram", user_id="user42")
# peer != user_id in kwargs, so this won't match
assert result == "default"
# ── platform wildcard match (tier 6) ─────────────────────────────
def test_platform_wildcard_match(self):
router = BindingRouter(
[{"agent": "assistant", "telegram": "*"}],
default_agent_id="default",
)
result = router.resolve(platform="telegram", chat_id="anything")
assert result == "assistant"
def test_platform_wildcard_no_match_different_platform(self):
router = BindingRouter(
[{"agent": "assistant", "telegram": "*"}],
default_agent_id="default",
)
result = router.resolve(platform="discord")
assert result == "default"
# ── default fallback ─────────────────────────────────────────────
def test_default_fallback_no_bindings(self):
router = BindingRouter([], default_agent_id="fallback")
result = router.resolve(platform="telegram", chat_id="123")
assert result == "fallback"
def test_default_fallback_no_match(self):
router = BindingRouter(
[{"agent": "coder", "discord": "999"}],
default_agent_id="fallback",
)
result = router.resolve(platform="telegram", chat_id="123")
assert result == "fallback"
# ── tier ordering: more specific wins ────────────────────────────
def test_chat_id_beats_platform_wildcard(self):
"""Tier 1 (chat_id) should win over tier 6 (platform wildcard)."""
router = BindingRouter(
[
{"agent": "general", "telegram": "*"},
{"agent": "specific", "telegram": "-100123"},
],
default_agent_id="default",
)
result = router.resolve(platform="telegram", chat_id="-100123")
assert result == "specific"
def test_guild_chat_type_beats_guild_only(self):
"""Tier 3 should win over tier 4."""
router = BindingRouter(
[
{"agent": "guild_agent", "discord": {"guild": "123"}},
{"agent": "channel_agent", "discord": {"guild": "123", "type": "channel"}},
],
default_agent_id="default",
)
result = router.resolve(
platform="discord", guild_id="123", chat_type="channel",
)
assert result == "channel_agent"
def test_guild_beats_chat_type_only(self):
"""Tier 4 should win over tier 5."""
router = BindingRouter(
[
{"agent": "type_agent", "discord": {"type": "channel"}},
{"agent": "guild_agent", "discord": {"guild": "123"}},
],
default_agent_id="default",
)
result = router.resolve(
platform="discord", guild_id="123", chat_type="channel",
)
assert result == "guild_agent"
def test_chat_type_beats_platform_only(self):
"""Tier 5 should win over tier 6."""
router = BindingRouter(
[
{"agent": "platform_agent", "telegram": "*"},
{"agent": "group_agent", "telegram": {"type": "group"}},
],
default_agent_id="default",
)
result = router.resolve(platform="telegram", chat_type="group")
assert result == "group_agent"
def test_chat_id_beats_guild_plus_chat_type(self):
"""Tier 1 beats tier 3."""
router = BindingRouter(
[
{"agent": "guild_type", "discord": {"guild": "123", "type": "channel"}},
{"agent": "exact", "discord": "chat999"},
],
default_agent_id="default",
)
result = router.resolve(
platform="discord", chat_id="chat999",
guild_id="123", chat_type="channel",
)
assert result == "exact"
# ── within-tier first-match-wins ─────────────────────────────────
def test_same_tier_first_match_wins(self):
"""Two tier-6 bindings: the first one listed should win."""
router = BindingRouter(
[
{"agent": "first", "telegram": "*"},
{"agent": "second", "telegram": "*"},
],
default_agent_id="default",
)
result = router.resolve(platform="telegram")
assert result == "first"
def test_same_tier_first_match_wins_chat_id(self):
"""Two tier-1 bindings for different chat_ids."""
router = BindingRouter(
[
{"agent": "first", "telegram": "aaa"},
{"agent": "second", "telegram": "bbb"},
],
default_agent_id="default",
)
assert router.resolve(platform="telegram", chat_id="aaa") == "first"
assert router.resolve(platform="telegram", chat_id="bbb") == "second"
# ── AND semantics: all fields must match ─────────────────────────
def test_and_semantics_guild_must_match(self):
"""Binding requires guild_id=123; different guild should not match."""
router = BindingRouter(
[{"agent": "guild_bot", "discord": {"guild": "123"}}],
default_agent_id="default",
)
assert router.resolve(platform="discord", guild_id="999") == "default"
def test_and_semantics_all_fields_required(self):
"""Binding requires guild_id AND chat_type; missing one → no match."""
router = BindingRouter(
[{"agent": "combo", "discord": {"guild": "123", "type": "channel"}}],
default_agent_id="default",
)
# Only guild_id, no chat_type → should NOT match
assert router.resolve(platform="discord", guild_id="123") == "default"
# Only chat_type, no guild_id → should NOT match
assert router.resolve(platform="discord", chat_type="channel") == "default"
# Both → should match
assert router.resolve(
platform="discord", guild_id="123", chat_type="channel",
) == "combo"
def test_and_semantics_platform_must_match(self):
"""Binding for telegram should not match discord."""
router = BindingRouter(
[{"agent": "tg", "telegram": "*"}],
default_agent_id="default",
)
assert router.resolve(platform="discord") == "default"
# ── no bindings uses default ─────────────────────────────────────
def test_no_bindings_returns_default(self):
router = BindingRouter([], default_agent_id="my_default")
assert router.resolve(platform="telegram") == "my_default"
def test_no_bindings_returns_default_with_all_kwargs(self):
router = BindingRouter([], default_agent_id="my_default")
assert router.resolve(
platform="telegram",
chat_id="123",
chat_type="group",
user_id="u1",
guild_id="g1",
team_id="t1",
) == "my_default"
# ═══════════════════════════════════════════════════════════════════════
# Edge cases
# ═══════════════════════════════════════════════════════════════════════
class TestEdgeCases:
"""Edge case tests."""
def test_empty_bindings_list(self):
router = BindingRouter([], default_agent_id="default")
assert router.resolve(platform="telegram") == "default"
def test_unknown_platform_falls_to_default(self):
"""Platform not in PLATFORM_NAMES doesn't match any binding."""
router = BindingRouter(
[{"agent": "a", "telegram": "*"}],
default_agent_id="default",
)
assert router.resolve(platform="matrix") == "default"
def test_unknown_platform_in_binding_ignored(self):
"""A binding with an unknown platform key produces empty match."""
b = normalize_binding({"agent": "a", "matrix": "*"})
assert b.match == {}
assert b.tier == 7
def test_binding_dataclass_frozen(self):
"""Binding is frozen; can't modify fields after creation."""
b = Binding(agent_id="a", match={"platform": "telegram"}, tier=6)
with pytest.raises(AttributeError):
b.agent_id = "b" # type: ignore[misc]
def test_binding_default_tier(self):
"""Default tier is 7."""
b = Binding(agent_id="a")
assert b.tier == 7
assert b.match == {}
def test_multiple_platforms_in_config(self):
"""Router handles multiple different platforms correctly."""
router = BindingRouter(
[
{"agent": "tg_bot", "telegram": "*"},
{"agent": "dc_bot", "discord": "*"},
{"agent": "sl_bot", "slack": "*"},
],
default_agent_id="default",
)
assert router.resolve(platform="telegram") == "tg_bot"
assert router.resolve(platform="discord") == "dc_bot"
assert router.resolve(platform="slack") == "sl_bot"
assert router.resolve(platform="whatsapp") == "default"
def test_bindings_sorted_by_tier(self):
"""Internal bindings list is sorted by tier (most specific first)."""
router = BindingRouter(
[
{"agent": "platform", "telegram": "*"}, # tier 6
{"agent": "exact", "telegram": "123"}, # tier 1
{"agent": "guild", "discord": {"guild": "1"}}, # tier 4
],
default_agent_id="default",
)
tiers = [b.tier for b in router._bindings]
assert tiers == sorted(tiers)
def test_team_id_match(self):
"""Binding with team_id matches when team_id is provided."""
router = BindingRouter(
[{"agent": "slack_team", "slack": {"team": "T01"}}],
default_agent_id="default",
)
assert router.resolve(platform="slack", team_id="T01") == "slack_team"
assert router.resolve(platform="slack", team_id="T99") == "default"
def test_complex_routing_scenario(self):
"""Full scenario with multiple tiers competing."""
router = BindingRouter(
[
{"agent": "fallback_tg", "telegram": "*"},
{"agent": "dev_chat", "telegram": "-100999"},
{"agent": "discord_general", "discord": "*"},
{"agent": "discord_guild", "discord": {"guild": "G1"}},
{"agent": "discord_guild_channel", "discord": {"guild": "G1", "type": "text"}},
],
default_agent_id="global_default",
)
# Telegram exact chat
assert router.resolve(
platform="telegram", chat_id="-100999",
) == "dev_chat"
# Telegram other chat → wildcard
assert router.resolve(
platform="telegram", chat_id="-100000",
) == "fallback_tg"
# Discord exact guild + type
assert router.resolve(
platform="discord", guild_id="G1", chat_type="text",
) == "discord_guild_channel"
# Discord guild only (no type)
assert router.resolve(
platform="discord", guild_id="G1",
) == "discord_guild"
# Discord other guild → platform wildcard
assert router.resolve(
platform="discord", guild_id="OTHER",
) == "discord_general"
# Unknown platform
assert router.resolve(platform="whatsapp") == "global_default"
def test_chat_type_alone_binding(self):
"""Tier 5: platform + chat_type only."""
router = BindingRouter(
[{"agent": "group_handler", "telegram": {"type": "group"}}],
default_agent_id="default",
)
assert router.resolve(
platform="telegram", chat_type="group",
) == "group_handler"
assert router.resolve(
platform="telegram", chat_type="private",
) == "default"
def test_resolve_with_none_values(self):
"""None values in kwargs should not match binding requirements."""
router = BindingRouter(
[{"agent": "guild_bot", "discord": {"guild": "123"}}],
default_agent_id="default",
)
# guild_id defaults to None
assert router.resolve(platform="discord") == "default"

View File

@@ -12,7 +12,7 @@ EXPECTED_COMMANDS = {
"/personality", "/clear", "/history", "/new", "/reset", "/retry", "/personality", "/clear", "/history", "/new", "/reset", "/retry",
"/undo", "/save", "/config", "/cron", "/skills", "/platforms", "/undo", "/save", "/config", "/cron", "/skills", "/platforms",
"/verbose", "/compress", "/title", "/usage", "/insights", "/paste", "/verbose", "/compress", "/title", "/usage", "/insights", "/paste",
"/reload-mcp", "/rollback", "/skin", "/quit", "/reload-mcp", "/rollback", "/skin", "/quit", "/agents",
} }

View File

@@ -245,6 +245,80 @@ class TestDelegateTask(unittest.TestCase):
self.assertEqual(kwargs["api_mode"], parent.api_mode) self.assertEqual(kwargs["api_mode"], parent.api_mode)
class TestSubagentModelConfig(unittest.TestCase):
"""Tests for configurable subagent model via config.yaml."""
def test_inherits_parent_model_by_default(self):
"""Without config, subagent uses parent's model."""
parent = _make_mock_parent(depth=0)
parent.model = "anthropic/claude-opus-4.6"
with (
patch("run_agent.AIAgent") as MockAgent,
patch("tools.delegate_tool._get_subagent_config", return_value={}),
):
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test default", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], "anthropic/claude-opus-4.6")
def test_uses_config_model_override(self):
"""Subagent uses model from config.subagent.model."""
parent = _make_mock_parent(depth=0)
parent.model = "anthropic/claude-opus-4.6"
with (
patch("run_agent.AIAgent") as MockAgent,
patch("tools.delegate_tool._get_subagent_config",
return_value={"model": "google/gemini-3-flash-preview"}),
):
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test config model", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview")
def test_explicit_model_overrides_config(self):
"""Explicit model arg takes precedence over config.subagent.model."""
parent = _make_mock_parent(depth=0)
with (
patch("run_agent.AIAgent") as MockAgent,
patch("tools.delegate_tool._get_subagent_config",
return_value={"model": "google/gemini-3-flash-preview"}),
):
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
# _run_single_child receives model arg from delegate_task
# but delegate_task doesn't expose model directly — it always
# passes None. So config should win over parent, but explicit
# model (if exposed in future) would win over config.
delegate_task(goal="Test precedence", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview")
def test_get_subagent_config_returns_empty_without_cli(self):
"""_get_subagent_config gracefully returns {} when CLI_CONFIG unavailable."""
from tools.delegate_tool import _get_subagent_config
# Simulate cli module being unavailable (e.g. gateway mode)
with patch.dict("sys.modules", {"cli": None}):
result = _get_subagent_config()
self.assertEqual(result, {})
class TestBlockedTools(unittest.TestCase): class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self): def test_blocked_tools_constant(self):
for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]: for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]:

View File

@@ -78,6 +78,15 @@ def _strip_blocked_tools(toolsets: List[str]) -> List[str]:
return [t for t in toolsets if t not in blocked_toolset_names] return [t for t in toolsets if t not in blocked_toolset_names]
def _get_subagent_config() -> Dict[str, Any]:
"""Load subagent config from CLI_CONFIG if available."""
try:
from cli import CLI_CONFIG
return CLI_CONFIG.get("subagent", {})
except Exception:
return {}
def _build_child_progress_callback(task_index: int, parent_agent, task_count: int = 1) -> Optional[callable]: def _build_child_progress_callback(task_index: int, parent_agent, task_count: int = 1) -> Optional[callable]:
"""Build a callback that relays child agent tool calls to the parent display. """Build a callback that relays child agent tool calls to the parent display.
@@ -199,10 +208,18 @@ def _run_single_child(
# count toward the session-wide limit. # count toward the session-wide limit.
shared_budget = getattr(parent_agent, "iteration_budget", None) shared_budget = getattr(parent_agent, "iteration_budget", None)
# Subagent model override from config.
# Precedence: explicit model arg > config.subagent.model > parent model
subagent_cfg = _get_subagent_config()
effective_model = model or subagent_cfg.get("model") or parent_agent.model
# Inherit tool policy from parent if available
parent_tool_policy = getattr(parent_agent, '_agent_tool_policy', None)
child = AIAgent( child = AIAgent(
base_url=parent_agent.base_url, base_url=parent_agent.base_url,
api_key=parent_api_key, api_key=parent_api_key,
model=model or parent_agent.model, model=effective_model,
provider=getattr(parent_agent, "provider", None), provider=getattr(parent_agent, "provider", None),
api_mode=getattr(parent_agent, "api_mode", None), api_mode=getattr(parent_agent, "api_mode", None),
max_iterations=max_iterations, max_iterations=max_iterations,
@@ -210,6 +227,7 @@ def _run_single_child(
reasoning_config=getattr(parent_agent, "reasoning_config", None), reasoning_config=getattr(parent_agent, "reasoning_config", None),
prefill_messages=getattr(parent_agent, "prefill_messages", None), prefill_messages=getattr(parent_agent, "prefill_messages", None),
enabled_toolsets=child_toolsets, enabled_toolsets=child_toolsets,
agent_tool_policy=parent_tool_policy,
quiet_mode=True, quiet_mode=True,
ephemeral_system_prompt=child_prompt, ephemeral_system_prompt=child_prompt,
log_prefix=f"[subagent-{task_index}]", log_prefix=f"[subagent-{task_index}]",
@@ -312,12 +330,14 @@ def delegate_task(
if parent_agent is None: if parent_agent is None:
return json.dumps({"error": "delegate_task requires a parent agent context."}) return json.dumps({"error": "delegate_task requires a parent agent context."})
# Depth limit # Depth limit -- configurable per-agent via _max_spawn_depth
_raw_max = getattr(parent_agent, '_max_spawn_depth', None)
max_depth = _raw_max if isinstance(_raw_max, int) else MAX_DEPTH
depth = getattr(parent_agent, '_delegate_depth', 0) depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH: if depth >= max_depth:
return json.dumps({ return json.dumps({
"error": ( "error": (
f"Delegation depth limit reached ({MAX_DEPTH}). " f"Delegation depth limit reached ({max_depth}). "
"Subagents cannot spawn further subagents." "Subagents cannot spawn further subagents."
) )
}) })

View File

@@ -95,20 +95,22 @@ class MemoryStore:
Tool responses always reflect this live state. Tool responses always reflect this live state.
""" """
def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375): def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375, memory_dir: Path = None):
self.memory_entries: List[str] = [] self.memory_entries: List[str] = []
self.user_entries: List[str] = [] self.user_entries: List[str] = []
self.memory_char_limit = memory_char_limit self.memory_char_limit = memory_char_limit
self.user_char_limit = user_char_limit self.user_char_limit = user_char_limit
self._memory_dir = memory_dir or MEMORY_DIR
self._memory_dir.mkdir(parents=True, exist_ok=True)
# Frozen snapshot for system prompt -- set once at load_from_disk() # Frozen snapshot for system prompt -- set once at load_from_disk()
self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""} self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""}
def load_from_disk(self): def load_from_disk(self):
"""Load entries from MEMORY.md and USER.md, capture system prompt snapshot.""" """Load entries from MEMORY.md and USER.md, capture system prompt snapshot."""
MEMORY_DIR.mkdir(parents=True, exist_ok=True) self._memory_dir.mkdir(parents=True, exist_ok=True)
self.memory_entries = self._read_file(MEMORY_DIR / "MEMORY.md") self.memory_entries = self._read_file(self._memory_dir / "MEMORY.md")
self.user_entries = self._read_file(MEMORY_DIR / "USER.md") self.user_entries = self._read_file(self._memory_dir / "USER.md")
# Deduplicate entries (preserves order, keeps first occurrence) # Deduplicate entries (preserves order, keeps first occurrence)
self.memory_entries = list(dict.fromkeys(self.memory_entries)) self.memory_entries = list(dict.fromkeys(self.memory_entries))
@@ -122,12 +124,12 @@ class MemoryStore:
def save_to_disk(self, target: str): def save_to_disk(self, target: str):
"""Persist entries to the appropriate file. Called after every mutation.""" """Persist entries to the appropriate file. Called after every mutation."""
MEMORY_DIR.mkdir(parents=True, exist_ok=True) self._memory_dir.mkdir(parents=True, exist_ok=True)
if target == "memory": if target == "memory":
self._write_file(MEMORY_DIR / "MEMORY.md", self.memory_entries) self._write_file(self._memory_dir / "MEMORY.md", self.memory_entries)
elif target == "user": elif target == "user":
self._write_file(MEMORY_DIR / "USER.md", self.user_entries) self._write_file(self._memory_dir / "USER.md", self.user_entries)
def _entries_for(self, target: str) -> List[str]: def _entries_for(self, target: str) -> List[str]:
if target == "user": if target == "user":