Compare commits

...

1 Commits

Author SHA1 Message Date
Mariano A. Nicolini
1b2272c2b8 fix(security): sanitize workdir parameter in terminal tool backends
Shell injection via unquoted workdir interpolation in docker, singularity,
and SSH backends.  When workdir contained shell metacharacters (e.g.
~/;id), arbitrary commands could execute.

Changes:
- Add shlex.quote() at each interpolation point in docker.py,
  singularity.py, and ssh.py with tilde-aware quoting (keep ~
  unquoted for shell expansion, quote only the subpath)
- Add _validate_workdir() allowlist in terminal_tool.py as
  defense-in-depth before workdir reaches any backend

Original work by Mariano A. Nicolini (PR #5620).  Salvaged with fixes
for tilde expansion (shlex.quote breaks cd ~/path) and replaced
incomplete deny-list with strict character allowlist.

Co-authored-by: Mariano A. Nicolini <entropidelic@users.noreply.github.com>
2026-04-06 13:16:25 -07:00
4 changed files with 65 additions and 7 deletions

View File

@@ -8,6 +8,7 @@ persistence via bind mounts.
import logging
import os
import re
import shlex
import shutil
import subprocess
import sys
@@ -484,9 +485,13 @@ class DockerEnvironment(BaseEnvironment):
else:
effective_stdin = stdin_data
# docker exec -w doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
exec_command = f"cd {work_dir} && {exec_command}"
# docker exec -w doesn't expand ~, so prepend a cd into the command.
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/"
assert self._container_id, "Container not started"

View File

@@ -8,6 +8,7 @@ via writable overlay directories that survive across sessions.
import json
import logging
import os
import shlex
import shutil
import subprocess
import tempfile
@@ -311,9 +312,13 @@ class SingularityEnvironment(BaseEnvironment):
else:
effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
exec_command = f"cd {work_dir} && {exec_command}"
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command.
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/tmp"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/tmp"
cmd = [self.executable, "exec", "--pwd", work_dir,

View File

@@ -1,6 +1,7 @@
"""SSH remote execution environment with ControlMaster connection persistence."""
import logging
import shlex
import shutil
import subprocess
import tempfile
@@ -228,7 +229,13 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command)
wrapped = f'cd {work_dir} && {exec_command}'
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
wrapped = f'cd ~ && {exec_command}'
elif work_dir.startswith("~/"):
wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}'
else:
wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}'
effective_timeout = timeout or self.timeout
if sudo_stdin is not None and stdin_data is not None:

View File

@@ -154,6 +154,34 @@ def _check_all_guards(command: str, env_type: str) -> dict:
approval_callback=_approval_callback)
# Allowlist: characters that can legitimately appear in directory paths.
# Covers alphanumeric, path separators, tilde, dot, hyphen, underscore, space,
# plus, at, equals, and comma. Everything else is rejected.
_WORKDIR_SAFE_RE = re.compile(r'^[A-Za-z0-9/_\-.~ +@=,]+$')
def _validate_workdir(workdir: str) -> str | None:
"""Reject workdir values that don't look like a filesystem path.
Uses an allowlist of safe characters rather than a deny-list, so novel
shell metacharacters can't slip through.
Returns None if safe, or an error message string if dangerous.
"""
if not workdir:
return None
if not _WORKDIR_SAFE_RE.match(workdir):
# Find the first offending character for a helpful message.
for ch in workdir:
if not _WORKDIR_SAFE_RE.match(ch):
return (
f"Blocked: workdir contains disallowed character {repr(ch)}. "
"Use a simple filesystem path without shell metacharacters."
)
return "Blocked: workdir contains disallowed characters."
return None
def _handle_sudo_failure(output: str, env_type: str) -> str:
"""
Check for sudo failure and add helpful message for messaging contexts.
@@ -1166,6 +1194,19 @@ def terminal_tool(
desc = approval.get("description", "flagged as dangerous")
approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval."
# Validate workdir against shell injection
if workdir:
workdir_error = _validate_workdir(workdir)
if workdir_error:
logger.warning("Blocked dangerous workdir: %s (command: %s)",
workdir[:200], command[:200])
return json.dumps({
"output": "",
"exit_code": -1,
"error": workdir_error,
"status": "blocked"
}, ensure_ascii=False)
# Prepare command for execution
if background:
# Spawn a tracked background process via the process registry.