mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b2272c2b8 |
@@ -8,6 +8,7 @@ persistence via bind mounts.
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -484,9 +485,13 @@ class DockerEnvironment(BaseEnvironment):
|
||||
else:
|
||||
effective_stdin = stdin_data
|
||||
|
||||
# docker exec -w doesn't expand ~, so prepend a cd into the command
|
||||
if work_dir == "~" or work_dir.startswith("~/"):
|
||||
exec_command = f"cd {work_dir} && {exec_command}"
|
||||
# docker exec -w doesn't expand ~, so prepend a cd into the command.
|
||||
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
|
||||
if work_dir == "~":
|
||||
exec_command = f"cd ~ && {exec_command}"
|
||||
work_dir = "/"
|
||||
elif work_dir.startswith("~/"):
|
||||
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
|
||||
work_dir = "/"
|
||||
|
||||
assert self._container_id, "Container not started"
|
||||
|
||||
@@ -8,6 +8,7 @@ via writable overlay directories that survive across sessions.
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
@@ -311,9 +312,13 @@ class SingularityEnvironment(BaseEnvironment):
|
||||
else:
|
||||
effective_stdin = stdin_data
|
||||
|
||||
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
|
||||
if work_dir == "~" or work_dir.startswith("~/"):
|
||||
exec_command = f"cd {work_dir} && {exec_command}"
|
||||
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command.
|
||||
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
|
||||
if work_dir == "~":
|
||||
exec_command = f"cd ~ && {exec_command}"
|
||||
work_dir = "/tmp"
|
||||
elif work_dir.startswith("~/"):
|
||||
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
|
||||
work_dir = "/tmp"
|
||||
|
||||
cmd = [self.executable, "exec", "--pwd", work_dir,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""SSH remote execution environment with ControlMaster connection persistence."""
|
||||
|
||||
import logging
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
@@ -228,7 +229,13 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
|
||||
stdin_data: str | None = None) -> dict:
|
||||
work_dir = cwd or self.cwd
|
||||
exec_command, sudo_stdin = self._prepare_command(command)
|
||||
wrapped = f'cd {work_dir} && {exec_command}'
|
||||
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
|
||||
if work_dir == "~":
|
||||
wrapped = f'cd ~ && {exec_command}'
|
||||
elif work_dir.startswith("~/"):
|
||||
wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}'
|
||||
else:
|
||||
wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}'
|
||||
effective_timeout = timeout or self.timeout
|
||||
|
||||
if sudo_stdin is not None and stdin_data is not None:
|
||||
|
||||
@@ -154,6 +154,34 @@ def _check_all_guards(command: str, env_type: str) -> dict:
|
||||
approval_callback=_approval_callback)
|
||||
|
||||
|
||||
# Allowlist: characters that can legitimately appear in directory paths.
|
||||
# Covers alphanumeric, path separators, tilde, dot, hyphen, underscore, space,
|
||||
# plus, at, equals, and comma. Everything else is rejected.
|
||||
_WORKDIR_SAFE_RE = re.compile(r'^[A-Za-z0-9/_\-.~ +@=,]+$')
|
||||
|
||||
|
||||
def _validate_workdir(workdir: str) -> str | None:
|
||||
"""Reject workdir values that don't look like a filesystem path.
|
||||
|
||||
Uses an allowlist of safe characters rather than a deny-list, so novel
|
||||
shell metacharacters can't slip through.
|
||||
|
||||
Returns None if safe, or an error message string if dangerous.
|
||||
"""
|
||||
if not workdir:
|
||||
return None
|
||||
if not _WORKDIR_SAFE_RE.match(workdir):
|
||||
# Find the first offending character for a helpful message.
|
||||
for ch in workdir:
|
||||
if not _WORKDIR_SAFE_RE.match(ch):
|
||||
return (
|
||||
f"Blocked: workdir contains disallowed character {repr(ch)}. "
|
||||
"Use a simple filesystem path without shell metacharacters."
|
||||
)
|
||||
return "Blocked: workdir contains disallowed characters."
|
||||
return None
|
||||
|
||||
|
||||
def _handle_sudo_failure(output: str, env_type: str) -> str:
|
||||
"""
|
||||
Check for sudo failure and add helpful message for messaging contexts.
|
||||
@@ -1166,6 +1194,19 @@ def terminal_tool(
|
||||
desc = approval.get("description", "flagged as dangerous")
|
||||
approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval."
|
||||
|
||||
# Validate workdir against shell injection
|
||||
if workdir:
|
||||
workdir_error = _validate_workdir(workdir)
|
||||
if workdir_error:
|
||||
logger.warning("Blocked dangerous workdir: %s (command: %s)",
|
||||
workdir[:200], command[:200])
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": workdir_error,
|
||||
"status": "blocked"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Prepare command for execution
|
||||
if background:
|
||||
# Spawn a tracked background process via the process registry.
|
||||
|
||||
Reference in New Issue
Block a user