diff --git a/tests/tools/test_daytona_environment.py b/tests/tools/test_daytona_environment.py index 72443bc5454..54039379174 100644 --- a/tests/tools/test_daytona_environment.py +++ b/tests/tools/test_daytona_environment.py @@ -336,7 +336,7 @@ class TestExecute: assert "/tmp" in cmd def test_daytona_error_returns_error_result(self, make_env, daytona_sdk): - """In the unified model, SDK errors are caught by _DaytonaProcessHandle + """In the unified model, SDK errors are caught by _ThreadedProcessHandle and returned as error results (no automatic retry).""" sb = _make_sandbox() sb.state = "started" @@ -347,7 +347,7 @@ class TestExecute: sb.process.exec.side_effect = daytona_sdk.DaytonaError("transient") result = env.execute("echo retry") assert result["returncode"] == 1 - assert "Daytona execution error" in result["output"] + assert "transient" in result["output"] # --------------------------------------------------------------------------- @@ -408,7 +408,7 @@ class TestInterrupt: class TestSdkError: def test_sdk_error_returns_error_result(self, make_env, daytona_sdk): - """SDK errors in _DaytonaProcessHandle are caught and returned cleanly.""" + """SDK errors in _ThreadedProcessHandle are caught and returned cleanly.""" sb = _make_sandbox() sb.state = "started" sb.process.exec.return_value = _make_exec_response(result="/root") @@ -418,7 +418,7 @@ class TestSdkError: sb.process.exec.side_effect = daytona_sdk.DaytonaError("fail") result = env.execute("echo x") assert result["returncode"] == 1 - assert "Daytona execution error" in result["output"] + assert "fail" in result["output"] # --------------------------------------------------------------------------- diff --git a/tests/tools/test_threaded_process_handle.py b/tests/tools/test_threaded_process_handle.py new file mode 100644 index 00000000000..35472ec4cb5 --- /dev/null +++ b/tests/tools/test_threaded_process_handle.py @@ -0,0 +1,63 @@ +"""Unit tests for the _ThreadedProcessHandle adapter in base.py.""" + +import threading + +import pytest + + +def test_successful_execution(): + """exec_fn returns (output, 0) -> returncode==0 and stdout reads output.""" + from tools.environments.base import _ThreadedProcessHandle + + handle = _ThreadedProcessHandle(lambda: ("hello output", 0)) + handle.wait() + assert handle.returncode == 0 + text = handle.stdout.read() + assert text == "hello output" + handle.stdout.close() + + +def test_nonzero_exit_code(): + """exec_fn returns (output, 42) -> returncode==42.""" + from tools.environments.base import _ThreadedProcessHandle + + handle = _ThreadedProcessHandle(lambda: ("error output", 42)) + handle.wait() + assert handle.returncode == 42 + text = handle.stdout.read() + assert text == "error output" + handle.stdout.close() + + +def test_exception_returns_rc1(): + """exec_fn raises RuntimeError -> returncode==1 and error message in stdout.""" + from tools.environments.base import _ThreadedProcessHandle + + def failing_fn(): + raise RuntimeError("boom") + + handle = _ThreadedProcessHandle(failing_fn) + handle.wait() + assert handle.returncode == 1 + text = handle.stdout.read() + assert "boom" in text + handle.stdout.close() + + +def test_poll_returns_none_while_running(): + """poll() returns None before exec_fn completes.""" + from tools.environments.base import _ThreadedProcessHandle + + barrier = threading.Event() + + def blocking_fn(): + barrier.wait(timeout=5) + return ("done", 0) + + handle = _ThreadedProcessHandle(blocking_fn) + assert handle.poll() is None + barrier.set() + handle.wait() + assert handle.poll() == 0 + handle.stdout.read() + handle.stdout.close() diff --git a/tools/environments/base.py b/tools/environments/base.py index 6eba24c0bba..a144f1115e6 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -4,7 +4,6 @@ from abc import ABC, abstractmethod import logging import os import shlex -import subprocess import threading import time import uuid @@ -55,6 +54,55 @@ class ProcessHandle(Protocol): def returncode(self) -> int | None: ... +class _ThreadedProcessHandle: + """ProcessHandle adapter for SDK backends that run in a background thread.""" + + def __init__(self, exec_fn): + self._done = threading.Event() + self._returncode = None + self._read_fd, self._write_fd = os.pipe() + self.stdout = os.fdopen(self._read_fd, "r") + self.stdin = None + + def _run(): + try: + output, exit_code = exec_fn() + writer = os.fdopen(self._write_fd, "w") + writer.write(output) + writer.close() + self._returncode = exit_code + except Exception as e: + try: + writer = os.fdopen(self._write_fd, "w") + writer.write(str(e)) + writer.close() + except Exception: + try: + os.close(self._write_fd) + except Exception: + pass + self._returncode = 1 + finally: + self._done.set() + + self._thread = threading.Thread(target=_run, daemon=True) + self._thread.start() + + def poll(self): + return self._returncode if self._done.is_set() else None + + def kill(self): + pass + + def wait(self, timeout=None): + self._done.wait(timeout=timeout) + return self._returncode + + @property + def returncode(self): + return self._returncode + + class BaseEnvironment(ABC): """Common interface for all Hermes execution backends. @@ -82,12 +130,18 @@ class BaseEnvironment(ABC): # ------------------------------------------------------------------ def _run_bash(self, cmd_string: str, *, + timeout: int | None = None, stdin_data: str | None = None) -> ProcessHandle: """Spawn ``bash -c `` in the backend. Returns a ProcessHandle (subprocess.Popen or equivalent adapter). The caller owns polling, timeout, output collection, and cleanup. + *timeout* is the effective per-command timeout. Backends that use + SDK-level or shell-level timeouts (Modal, Daytona) should forward + this value. Backends where timeout is enforced by + ``_wait_for_process`` (local, docker, ssh, singularity) may ignore it. + If *stdin_data* is provided, write it to the process's stdin and close. Backends that cannot pipe stdin (Modal, Daytona) must embed it via heredoc in *cmd_string* before calling their SDK. @@ -112,15 +166,15 @@ class BaseEnvironment(ABC): # Snapshot — login-shell env capture (called once at session init) # ------------------------------------------------------------------ - def _run_bash_login(self, cmd_string: str) -> ProcessHandle: + def _run_bash_login(self, cmd_string: str, *, + timeout: int | None = None) -> ProcessHandle: """Spawn ``bash -l -c `` for snapshot creation. Defaults to ``_run_bash`` — backends override this when the login flag needs different handling (e.g. local adds ``-l`` to Popen args). """ - return self._run_bash(cmd_string) + return self._run_bash(cmd_string, timeout=timeout) - # Subclasses may increase for slow backends (Modal, Daytona cold start) _snapshot_timeout: int = 15 def init_session(self): @@ -145,7 +199,7 @@ class BaseEnvironment(ABC): result = {} try: - proc = self._run_bash_login(bootstrap) + proc = self._run_bash_login(bootstrap, timeout=self._snapshot_timeout) result = self._wait_for_process(proc, timeout=self._snapshot_timeout) if result["returncode"] == 0: self._snapshot_ready = True @@ -160,13 +214,39 @@ class BaseEnvironment(ABC): except Exception as e: logger.warning("Snapshot creation failed: %s", e) - # Pick up the reported cwd from bootstrap stdout self._extract_cwd_from_output(result) # ------------------------------------------------------------------ # Command wrapping # ------------------------------------------------------------------ + @staticmethod + def _embed_stdin_heredoc(cmd_string: str, stdin_data: str) -> str: + """Wrap *stdin_data* as a shell heredoc appended to *cmd_string*. + + Used by backends that cannot pipe stdin (Modal, Daytona, ManagedModal). + """ + marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" + while marker in stdin_data: + marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" + return f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}" + + def _resolve_tilde(self, path: str) -> str: + """Expand ``~`` to the actual home directory path. + + Remote backends (SSH, Daytona) set ``_remote_home`` during init; + local uses ``os.path.expanduser``. Tilde must be resolved before + ``shlex.quote`` since single-quoting prevents shell tilde expansion. + """ + if not path or not path.startswith("~"): + return path + home = getattr(self, "_remote_home", None) or os.path.expanduser("~") + if path == "~": + return home + if path.startswith("~/"): + return home + path[1:] + return path # ~otheruser — leave for shell to handle + def _wrap_command(self, command: str, cwd: str) -> str: """Wrap a user command with snapshot sourcing and CWD tracking. @@ -180,16 +260,17 @@ class BaseEnvironment(ABC): f"source {self._snapshot_path} 2>/dev/null || true" ) - # 2. cd to working directory + # 2. cd to working directory (resolve ~ before quoting) work_dir = cwd or self.cwd if work_dir: + work_dir = self._resolve_tilde(work_dir) parts.append(f"cd {shlex.quote(work_dir)} || exit 1") # 3. The actual command (eval to handle complex shell syntax) escaped = command.replace("'", "'\\''") parts.append(f"eval '{escaped}'") - # 4. Capture exit code, echo CWD to stdout for local parsing + # 4. Capture exit code, record CWD parts.append("__hermes_ec=$?") parts.append( f"printf '\\n{_CWD_MARKER}%s{_CWD_MARKER}\\n' \"$(pwd -P)\"" @@ -222,10 +303,10 @@ class BaseEnvironment(ABC): wrapped = self._wrap_command(exec_command, cwd) effective_timeout = timeout or self.timeout - proc = self._run_bash(wrapped, stdin_data=effective_stdin) + proc = self._run_bash(wrapped, timeout=effective_timeout, + stdin_data=effective_stdin) result = self._wait_for_process(proc, timeout=effective_timeout) - # Extract CWD from stdout (in-band, no remote file read needed) self._extract_cwd_from_output(result) return result @@ -250,28 +331,36 @@ class BaseEnvironment(ABC): reader.start() deadline = time.monotonic() + timeout - while proc.poll() is None: - if is_interrupted(): - self._kill_process(proc) - reader.join(timeout=2) - partial = "".join(output_chunks) - return { - "output": partial + "\n[Command interrupted]", - "returncode": 130, - } - if time.monotonic() > deadline: - self._kill_process(proc) - reader.join(timeout=2) - partial = "".join(output_chunks) - msg = f"\n[Command timed out after {timeout}s]" - return { - "output": (partial + msg) if partial else msg.lstrip(), - "returncode": 124, - } - time.sleep(0.2) + try: + while proc.poll() is None: + if is_interrupted(): + self._kill_process(proc) + reader.join(timeout=2) + partial = "".join(output_chunks) + return { + "output": partial + "\n[Command interrupted]", + "returncode": 130, + } + if time.monotonic() > deadline: + self._kill_process(proc) + reader.join(timeout=2) + partial = "".join(output_chunks) + msg = f"\n[Command timed out after {timeout}s]" + return { + "output": (partial + msg) if partial else msg.lstrip(), + "returncode": 124, + } + time.sleep(0.2) - reader.join(timeout=5) - return {"output": "".join(output_chunks), "returncode": proc.returncode} + reader.join(timeout=5) + return {"output": "".join(output_chunks), "returncode": proc.returncode} + finally: + # Close the stdout pipe to prevent FD leaks, especially for + # SDK-backed handles (Modal, Daytona) that use os.pipe(). + try: + proc.stdout.close() + except Exception: + pass def _kill_process(self, proc: ProcessHandle): """Kill a process. Backends may override for process-group kill.""" @@ -288,7 +377,7 @@ class BaseEnvironment(ABC): pass # ------------------------------------------------------------------ - # CWD tracking (in-band via stdout, no remote file reads) + # CWD tracking # ------------------------------------------------------------------ def _extract_cwd_from_output(self, result: dict) -> dict: @@ -340,47 +429,10 @@ class BaseEnvironment(ABC): pass # ------------------------------------------------------------------ - # Shared helpers (kept for backward compat during migration) + # Shared helpers # ------------------------------------------------------------------ def _prepare_command(self, command: str) -> tuple[str, str | None]: """Transform sudo commands if SUDO_PASSWORD is available.""" from tools.terminal_tool import _transform_sudo_command return _transform_sudo_command(command) - - def _build_run_kwargs(self, timeout: int | None, - stdin_data: str | None = None) -> dict: - """Build common subprocess.run kwargs for non-interactive execution.""" - kw = { - "text": True, - "timeout": timeout or self.timeout, - "encoding": "utf-8", - "errors": "replace", - "stdout": subprocess.PIPE, - "stderr": subprocess.STDOUT, - } - if stdin_data is not None: - kw["input"] = stdin_data - else: - kw["stdin"] = subprocess.DEVNULL - return kw - - def execute_oneshot(self, command: str, cwd: str = "", *, - timeout: int | None = None, - stdin_data: str | None = None) -> dict: - """Execute a command bypassing any persistent shell. - - Safe for concurrent use alongside a long-running execute() call. - Backends that maintain a persistent shell (SSH, Local) override this - to route through their oneshot path, avoiding the shell lock. - Non-persistent backends delegate to execute(). - """ - return self.execute(command, cwd=cwd, timeout=timeout, - stdin_data=stdin_data) - - def _timeout_result(self, timeout: int | None) -> dict: - """Standard return dict when a command times out.""" - return { - "output": f"Command timed out after {timeout or self.timeout}s", - "returncode": 124, - } diff --git a/tools/environments/daytona.py b/tools/environments/daytona.py index 011a640e3c3..e94c0c4f902 100644 --- a/tools/environments/daytona.py +++ b/tools/environments/daytona.py @@ -7,10 +7,8 @@ and resumed on next creation, preserving the filesystem across sessions. import logging import math -import os import shlex import threading -import uuid import warnings from pathlib import Path from typing import Dict, Optional @@ -20,58 +18,6 @@ from tools.environments.base import BaseEnvironment logger = logging.getLogger(__name__) -class _DaytonaProcessHandle: - """Adapter making Daytona's blocking SDK exec look like Popen.""" - - def __init__(self, sandbox, cmd_string, cwd, timeout): - self._done = threading.Event() - self._returncode = None - self._read_fd, self._write_fd = os.pipe() - self.stdout = os.fdopen(self._read_fd, "r") - self.stdin = None - - # Wrap with shell timeout (Daytona SDK timeout is unreliable) - timed_cmd = f"timeout {timeout} bash -c {shlex.quote(cmd_string)}" - - def _run(): - try: - response = sandbox.process.exec(timed_cmd, cwd=cwd) - writer = os.fdopen(self._write_fd, "w") - writer.write(response.result or "") - writer.close() - self._returncode = response.exit_code - except Exception as e: - try: - writer = os.fdopen(self._write_fd, "w") - writer.write(f"Daytona execution error: {e}") - writer.close() - except Exception: - try: - os.close(self._write_fd) - except Exception: - pass - self._returncode = 1 - finally: - self._done.set() - - self._thread = threading.Thread(target=_run, daemon=True) - self._thread.start() - - def poll(self): - return self._returncode if self._done.is_set() else None - - def kill(self): - pass - - def wait(self, timeout=None): - self._done.wait(timeout=timeout) - return self._returncode - - @property - def returncode(self): - return self._returncode - - class DaytonaEnvironment(BaseEnvironment): """Daytona cloud sandbox execution backend. @@ -79,6 +25,8 @@ class DaytonaEnvironment(BaseEnvironment): instead of snapshots, making it faster and stateless on the host. """ + _snapshot_timeout = 60 # Daytona sandbox startup can be slow + def __init__( self, image: str, @@ -245,21 +193,33 @@ class DaytonaEnvironment(BaseEnvironment): self._ensure_sandbox_ready() self._sync_skills_and_credentials() - def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None): - """Spawn ``bash -c `` inside the Daytona sandbox. - - Returns a _DaytonaProcessHandle (satisfies the ProcessHandle protocol). - stdin_data is embedded as a heredoc since Daytona cannot pipe stdin. - """ + def _run_bash(self, cmd_string: str, *, timeout: int | None = None, + stdin_data: str | None = None): + """Spawn ``bash -c `` inside the Daytona sandbox.""" + effective_timeout = timeout or self.timeout if stdin_data is not None: - marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" - while marker in stdin_data: - marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" - cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}" - effective_cwd = self.cwd or None - return _DaytonaProcessHandle( - self._sandbox, cmd_string, effective_cwd, self.timeout, - ) + cmd_string = self._embed_stdin_heredoc(cmd_string, stdin_data) + return self._daytona_exec(cmd_string, effective_timeout, login=False) + + def _run_bash_login(self, cmd_string: str, *, + timeout: int | None = None): + """Spawn ``bash -l -c `` for snapshot creation.""" + effective_timeout = timeout or self._snapshot_timeout + return self._daytona_exec(cmd_string, effective_timeout, login=True) + + def _daytona_exec(self, cmd_string: str, timeout: int, login: bool): + """Create a _ThreadedProcessHandle wrapping Daytona's blocking exec.""" + from tools.environments.base import _ThreadedProcessHandle + sandbox = self._sandbox + + shell_flag = "-l -c" if login else "-c" + timed_cmd = f"timeout {timeout} bash {shell_flag} {shlex.quote(cmd_string)}" + + def exec_fn(): + response = sandbox.process.exec(timed_cmd) + return response.result or "", response.exit_code + + return _ThreadedProcessHandle(exec_fn) # ------------------------------------------------------------------ # Lifecycle diff --git a/tools/environments/modal.py b/tools/environments/modal.py index 984bde313b9..832012d1e8f 100644 --- a/tools/environments/modal.py +++ b/tools/environments/modal.py @@ -7,10 +7,9 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions. import asyncio import json import logging -import os +import re import shlex import threading -import uuid from pathlib import Path from typing import Any, Dict, Optional @@ -22,6 +21,13 @@ logger = logging.getLogger(__name__) _SNAPSHOT_STORE = get_hermes_home() / "modal_snapshots.json" _DIRECT_SNAPSHOT_NAMESPACE = "direct" +# Matches official Python Docker Hub tags: python:3.12, python:3.12-slim, +# python:3.12.3-bookworm, etc. Rejects alpine (not Debian-based) and +# ambiguous tags like python:3 or python:latest. +_PYTHON_IMAGE_RE = re.compile( + r"^python:(\d+\.\d+)(?:\.\d+)?(?:-(slim|bookworm|bullseye|slim-bookworm|slim-bullseye))?$" +) + def _load_snapshots() -> Dict[str, str]: """Load snapshot ID mapping from disk.""" @@ -85,7 +91,13 @@ def _delete_direct_snapshot(task_id: str, snapshot_id: str | None = None) -> Non def _resolve_modal_image(image_spec: Any) -> Any: - """Convert registry references or snapshot ids into Modal image objects.""" + """Convert registry references or snapshot ids into Modal image objects. + + Official ``python:X.Y*`` images are mapped to + ``modal.Image.debian_slim(python_version=X.Y)`` because the stock + Docker Hub images lack build tools (gcc/g++) required by Modal's + internal pip bootstrap. + """ import modal as _modal if not isinstance(image_spec, str): @@ -94,6 +106,10 @@ def _resolve_modal_image(image_spec: Any) -> Any: if image_spec.startswith("im-"): return _modal.Image.from_id(image_spec) + m = _PYTHON_IMAGE_RE.match(image_spec) + if m: + return _modal.Image.debian_slim(python_version=m.group(1)) + return _modal.Image.from_registry( image_spec, setup_dockerfile_commands=[ @@ -135,75 +151,6 @@ class _AsyncWorker: self._thread.join(timeout=10) -class _ModalProcessHandle: - """Adapter making Modal's async sandbox.exec look like Popen.""" - - def __init__(self, worker, sandbox, cmd_string, timeout): - self._done = threading.Event() - self._returncode = None - self._read_fd, self._write_fd = os.pipe() - self.stdout = os.fdopen(self._read_fd, "r") - self.stdin = None - - def _run(): - try: - async def _exec(): - process = await sandbox.exec.aio( - "bash", "-c", cmd_string, timeout=timeout, - ) - stdout = await process.stdout.read.aio() - stderr = await process.stderr.read.aio() - exit_code = await process.wait.aio() - return stdout, stderr, exit_code - - stdout, stderr, exit_code = worker.run_coroutine( - _exec(), timeout=timeout + 30, - ) - writer = os.fdopen(self._write_fd, "w") - if stdout: - writer.write( - stdout if isinstance(stdout, str) - else stdout.decode("utf-8", errors="replace") - ) - if stderr: - writer.write( - stderr if isinstance(stderr, str) - else stderr.decode("utf-8", errors="replace") - ) - writer.close() - self._returncode = exit_code - except Exception as e: - try: - writer = os.fdopen(self._write_fd, "w") - writer.write(f"Modal execution error: {e}") - writer.close() - except Exception: - try: - os.close(self._write_fd) - except Exception: - pass - self._returncode = 1 - finally: - self._done.set() - - self._thread = threading.Thread(target=_run, daemon=True) - self._thread.start() - - def poll(self): - return self._returncode if self._done.is_set() else None - - def kill(self): - pass # Handled by sandbox termination in environment - - def wait(self, timeout=None): - self._done.wait(timeout=timeout) - return self._returncode - - @property - def returncode(self): - return self._returncode - - class ModalEnvironment(BaseEnvironment): """Modal cloud execution via native Modal sandboxes. @@ -212,6 +159,8 @@ class ModalEnvironment(BaseEnvironment): satisfying the ProcessHandle protocol for BaseEnvironment._wait_for_process(). """ + _snapshot_timeout = 60 # Modal sandbox cold start can be slow + def __init__( self, image: str, @@ -297,10 +246,10 @@ class ModalEnvironment(BaseEnvironment): try: target_image_spec = restored_snapshot_id or image try: - # _resolve_modal_image keeps the Modal bootstrap fix together: - # it applies setup_dockerfile_commands with ensurepip before - # Modal builds registry images, while snapshot ids restore via - # modal.Image.from_id() without rebuilding. + # _resolve_modal_image routes python:X.Y images to + # debian_slim (avoiding build-tool failures) and applies + # setup_dockerfile_commands with ensurepip for other + # registry images. Snapshot ids restore via from_id(). effective_image = _resolve_modal_image(target_image_spec) self._app, self._sandbox = self._worker.run_coroutine( _create_sandbox(effective_image), @@ -400,20 +349,44 @@ class ModalEnvironment(BaseEnvironment): def _before_execute(self) -> None: self._sync_files() - def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None): - """Spawn ``bash -c `` inside the Modal sandbox. - - Returns a _ModalProcessHandle (satisfies the ProcessHandle protocol). - stdin_data is embedded as a heredoc since Modal cannot pipe stdin. - """ + def _run_bash(self, cmd_string: str, *, timeout: int | None = None, + stdin_data: str | None = None): + """Spawn ``bash -c `` inside the Modal sandbox.""" + effective_timeout = timeout or self.timeout if stdin_data is not None: - marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" - while marker in stdin_data: - marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" - cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}" - return _ModalProcessHandle( - self._worker, self._sandbox, cmd_string, self.timeout, - ) + cmd_string = self._embed_stdin_heredoc(cmd_string, stdin_data) + return self._modal_exec(cmd_string, effective_timeout, login=False) + + def _run_bash_login(self, cmd_string: str, *, + timeout: int | None = None): + """Spawn ``bash -l -c `` for snapshot creation.""" + effective_timeout = timeout or self._snapshot_timeout + return self._modal_exec(cmd_string, effective_timeout, login=True) + + def _modal_exec(self, cmd_string: str, timeout: int, login: bool): + """Create a _ThreadedProcessHandle wrapping Modal's async exec.""" + from tools.environments.base import _ThreadedProcessHandle + worker = self._worker + sandbox = self._sandbox + + def exec_fn(): + async def _exec(): + args = ["bash", "-l", "-c", cmd_string] if login else ["bash", "-c", cmd_string] + process = await sandbox.exec.aio(*args, timeout=timeout) + stdout = await process.stdout.read.aio() + stderr = await process.stderr.read.aio() + exit_code = await process.wait.aio() + return stdout, stderr, exit_code + + stdout, stderr, exit_code = worker.run_coroutine( + _exec(), timeout=timeout + 30, + ) + output = stdout if isinstance(stdout, str) else stdout.decode("utf-8", errors="replace") + if stderr: + output += stderr if isinstance(stderr, str) else stderr.decode("utf-8", errors="replace") + return output, exit_code + + return _ThreadedProcessHandle(exec_fn) # ------------------------------------------------------------------ # Lifecycle