Compare commits

...

1 Commits

Author SHA1 Message Date
Mariano A. Nicolini
1b2272c2b8 fix(security): sanitize workdir parameter in terminal tool backends
Shell injection via unquoted workdir interpolation in docker, singularity,
and SSH backends.  When workdir contained shell metacharacters (e.g.
~/;id), arbitrary commands could execute.

Changes:
- Add shlex.quote() at each interpolation point in docker.py,
  singularity.py, and ssh.py with tilde-aware quoting (keep ~
  unquoted for shell expansion, quote only the subpath)
- Add _validate_workdir() allowlist in terminal_tool.py as
  defense-in-depth before workdir reaches any backend

Original work by Mariano A. Nicolini (PR #5620).  Salvaged with fixes
for tilde expansion (shlex.quote breaks cd ~/path) and replaced
incomplete deny-list with strict character allowlist.

Co-authored-by: Mariano A. Nicolini <entropidelic@users.noreply.github.com>
2026-04-06 13:16:25 -07:00
4 changed files with 65 additions and 7 deletions

View File

@@ -8,6 +8,7 @@ persistence via bind mounts.
import logging import logging
import os import os
import re import re
import shlex
import shutil import shutil
import subprocess import subprocess
import sys import sys
@@ -484,9 +485,13 @@ class DockerEnvironment(BaseEnvironment):
else: else:
effective_stdin = stdin_data effective_stdin = stdin_data
# docker exec -w doesn't expand ~, so prepend a cd into the command # docker exec -w doesn't expand ~, so prepend a cd into the command.
if work_dir == "~" or work_dir.startswith("~/"): # Keep ~ unquoted (for shell expansion) and quote only the subpath.
exec_command = f"cd {work_dir} && {exec_command}" if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/" work_dir = "/"
assert self._container_id, "Container not started" assert self._container_id, "Container not started"

View File

@@ -8,6 +8,7 @@ via writable overlay directories that survive across sessions.
import json import json
import logging import logging
import os import os
import shlex
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
@@ -311,9 +312,13 @@ class SingularityEnvironment(BaseEnvironment):
else: else:
effective_stdin = stdin_data effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command # apptainer exec --pwd doesn't expand ~, so prepend a cd into the command.
if work_dir == "~" or work_dir.startswith("~/"): # Keep ~ unquoted (for shell expansion) and quote only the subpath.
exec_command = f"cd {work_dir} && {exec_command}" if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/tmp"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/tmp" work_dir = "/tmp"
cmd = [self.executable, "exec", "--pwd", work_dir, cmd = [self.executable, "exec", "--pwd", work_dir,

View File

@@ -1,6 +1,7 @@
"""SSH remote execution environment with ControlMaster connection persistence.""" """SSH remote execution environment with ControlMaster connection persistence."""
import logging import logging
import shlex
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
@@ -228,7 +229,13 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
stdin_data: str | None = None) -> dict: stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command) exec_command, sudo_stdin = self._prepare_command(command)
wrapped = f'cd {work_dir} && {exec_command}' # Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
wrapped = f'cd ~ && {exec_command}'
elif work_dir.startswith("~/"):
wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}'
else:
wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}'
effective_timeout = timeout or self.timeout effective_timeout = timeout or self.timeout
if sudo_stdin is not None and stdin_data is not None: if sudo_stdin is not None and stdin_data is not None:

View File

@@ -154,6 +154,34 @@ def _check_all_guards(command: str, env_type: str) -> dict:
approval_callback=_approval_callback) approval_callback=_approval_callback)
# Allowlist: characters that can legitimately appear in directory paths.
# Covers alphanumeric, path separators, tilde, dot, hyphen, underscore, space,
# plus, at, equals, and comma. Everything else is rejected.
_WORKDIR_SAFE_RE = re.compile(r'^[A-Za-z0-9/_\-.~ +@=,]+$')
def _validate_workdir(workdir: str) -> str | None:
"""Reject workdir values that don't look like a filesystem path.
Uses an allowlist of safe characters rather than a deny-list, so novel
shell metacharacters can't slip through.
Returns None if safe, or an error message string if dangerous.
"""
if not workdir:
return None
if not _WORKDIR_SAFE_RE.match(workdir):
# Find the first offending character for a helpful message.
for ch in workdir:
if not _WORKDIR_SAFE_RE.match(ch):
return (
f"Blocked: workdir contains disallowed character {repr(ch)}. "
"Use a simple filesystem path without shell metacharacters."
)
return "Blocked: workdir contains disallowed characters."
return None
def _handle_sudo_failure(output: str, env_type: str) -> str: def _handle_sudo_failure(output: str, env_type: str) -> str:
""" """
Check for sudo failure and add helpful message for messaging contexts. Check for sudo failure and add helpful message for messaging contexts.
@@ -1166,6 +1194,19 @@ def terminal_tool(
desc = approval.get("description", "flagged as dangerous") desc = approval.get("description", "flagged as dangerous")
approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval." approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval."
# Validate workdir against shell injection
if workdir:
workdir_error = _validate_workdir(workdir)
if workdir_error:
logger.warning("Blocked dangerous workdir: %s (command: %s)",
workdir[:200], command[:200])
return json.dumps({
"output": "",
"exit_code": -1,
"error": workdir_error,
"status": "blocked"
}, ensure_ascii=False)
# Prepare command for execution # Prepare command for execution
if background: if background:
# Spawn a tracked background process via the process registry. # Spawn a tracked background process via the process registry.