diff --git a/tools/environments/docker.py b/tools/environments/docker.py index ea553a7b60..1d2d325cba 100644 --- a/tools/environments/docker.py +++ b/tools/environments/docker.py @@ -8,6 +8,7 @@ persistence via bind mounts. import logging import os import re +import shlex import shutil import subprocess import sys @@ -484,9 +485,13 @@ class DockerEnvironment(BaseEnvironment): else: effective_stdin = stdin_data - # docker exec -w doesn't expand ~, so prepend a cd into the command - if work_dir == "~" or work_dir.startswith("~/"): - exec_command = f"cd {work_dir} && {exec_command}" + # docker exec -w doesn't expand ~, so prepend a cd into the command. + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + exec_command = f"cd ~ && {exec_command}" + work_dir = "/" + elif work_dir.startswith("~/"): + exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}" work_dir = "/" assert self._container_id, "Container not started" diff --git a/tools/environments/singularity.py b/tools/environments/singularity.py index 89d9ffb04b..6643ea1b3f 100644 --- a/tools/environments/singularity.py +++ b/tools/environments/singularity.py @@ -8,6 +8,7 @@ via writable overlay directories that survive across sessions. import json import logging import os +import shlex import shutil import subprocess import tempfile @@ -311,9 +312,13 @@ class SingularityEnvironment(BaseEnvironment): else: effective_stdin = stdin_data - # apptainer exec --pwd doesn't expand ~, so prepend a cd into the command - if work_dir == "~" or work_dir.startswith("~/"): - exec_command = f"cd {work_dir} && {exec_command}" + # apptainer exec --pwd doesn't expand ~, so prepend a cd into the command. + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + exec_command = f"cd ~ && {exec_command}" + work_dir = "/tmp" + elif work_dir.startswith("~/"): + exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}" work_dir = "/tmp" cmd = [self.executable, "exec", "--pwd", work_dir, diff --git a/tools/environments/ssh.py b/tools/environments/ssh.py index 387dea34e0..afd28c4aff 100644 --- a/tools/environments/ssh.py +++ b/tools/environments/ssh.py @@ -1,6 +1,7 @@ """SSH remote execution environment with ControlMaster connection persistence.""" import logging +import shlex import shutil import subprocess import tempfile @@ -228,7 +229,13 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): stdin_data: str | None = None) -> dict: work_dir = cwd or self.cwd exec_command, sudo_stdin = self._prepare_command(command) - wrapped = f'cd {work_dir} && {exec_command}' + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + wrapped = f'cd ~ && {exec_command}' + elif work_dir.startswith("~/"): + wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}' + else: + wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}' effective_timeout = timeout or self.timeout if sudo_stdin is not None and stdin_data is not None: diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 26591ceedb..be565f1966 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -154,6 +154,34 @@ def _check_all_guards(command: str, env_type: str) -> dict: approval_callback=_approval_callback) +# Allowlist: characters that can legitimately appear in directory paths. +# Covers alphanumeric, path separators, tilde, dot, hyphen, underscore, space, +# plus, at, equals, and comma. Everything else is rejected. +_WORKDIR_SAFE_RE = re.compile(r'^[A-Za-z0-9/_\-.~ +@=,]+$') + + +def _validate_workdir(workdir: str) -> str | None: + """Reject workdir values that don't look like a filesystem path. + + Uses an allowlist of safe characters rather than a deny-list, so novel + shell metacharacters can't slip through. + + Returns None if safe, or an error message string if dangerous. + """ + if not workdir: + return None + if not _WORKDIR_SAFE_RE.match(workdir): + # Find the first offending character for a helpful message. + for ch in workdir: + if not _WORKDIR_SAFE_RE.match(ch): + return ( + f"Blocked: workdir contains disallowed character {repr(ch)}. " + "Use a simple filesystem path without shell metacharacters." + ) + return "Blocked: workdir contains disallowed characters." + return None + + def _handle_sudo_failure(output: str, env_type: str) -> str: """ Check for sudo failure and add helpful message for messaging contexts. @@ -1166,6 +1194,19 @@ def terminal_tool( desc = approval.get("description", "flagged as dangerous") approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval." + # Validate workdir against shell injection + if workdir: + workdir_error = _validate_workdir(workdir) + if workdir_error: + logger.warning("Blocked dangerous workdir: %s (command: %s)", + workdir[:200], command[:200]) + return json.dumps({ + "output": "", + "exit_code": -1, + "error": workdir_error, + "status": "blocked" + }, ensure_ascii=False) + # Prepare command for execution if background: # Spawn a tracked background process via the process registry.