File size: 6,775 Bytes
658460c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""Windows-specific implementation of process utilities.

This file is only meant to be imported by process.py, not by end-users.
"""

import ctypes
import os
import subprocess
import sys
import time
from ctypes import POINTER, c_int
from ctypes.wintypes import HLOCAL, LPCWSTR
from subprocess import STDOUT
from threading import Thread
from types import TracebackType
from typing import List, Optional

from . import py3compat
from ._process_common import arg_split as py_arg_split

from ._process_common import process_handler, read_no_interrupt
from .encoding import DEFAULT_ENCODING


class AvoidUNCPath:
    """A context manager to protect command execution from UNC paths.

    In the Win32 API, commands can't be invoked with the cwd being a UNC path.
    This context manager temporarily changes directory to the 'C:' drive on
    entering, and restores the original working directory on exit.

    The context manager returns the starting working directory *if* it made a
    change and None otherwise, so that users can apply the necessary adjustment
    to their system calls in the event of a change.

    Examples
    --------
    ::
        cmd = 'dir'
        with AvoidUNCPath() as path:
            if path is not None:
                cmd = '"pushd %s &&"%s' % (path, cmd)
            os.system(cmd)
    """

    def __enter__(self) -> Optional[str]:
        self.path = os.getcwd()
        self.is_unc_path = self.path.startswith(r"\\")
        if self.is_unc_path:
            # change to c drive (as cmd.exe cannot handle UNC addresses)
            os.chdir("C:")
            return self.path
        else:
            # We return None to signal that there was no change in the working
            # directory
            return None

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: TracebackType,
    ) -> None:
        if self.is_unc_path:
            os.chdir(self.path)


def _system_body(p: subprocess.Popen) -> int:
    """Callback for _system."""
    enc = DEFAULT_ENCODING

    # Dec 2024: in both of these functions, I'm not sure why we .splitlines()
    # the bytes and then decode each line individually instead of just decoding
    # the whole thing at once.
    def stdout_read() -> None:
        try:
            assert p.stdout is not None
            for byte_line in read_no_interrupt(p.stdout).splitlines():
                line = byte_line.decode(enc, "replace")
                print(line, file=sys.stdout)
        except Exception as e:
            print(f"Error reading stdout: {e}", file=sys.stderr)

    def stderr_read() -> None:
        try:
            assert p.stderr is not None
            for byte_line in read_no_interrupt(p.stderr).splitlines():
                line = byte_line.decode(enc, "replace")
                print(line, file=sys.stderr)
        except Exception as e:
            print(f"Error reading stderr: {e}", file=sys.stderr)

    stdout_thread = Thread(target=stdout_read)
    stderr_thread = Thread(target=stderr_read)

    stdout_thread.start()
    stderr_thread.start()

    # Wait to finish for returncode. Unfortunately, Python has a bug where
    # wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
    # a loop instead of just doing `return p.wait()`
    while True:
        result = p.poll()
        if result is None:
            time.sleep(0.01)
        else:
            break

    # Join the threads to ensure they complete before returning
    stdout_thread.join()
    stderr_thread.join()

    return result


def system(cmd: str) -> Optional[int]:
    """Win32 version of os.system() that works with network shares.

    Note that this implementation returns None, as meant for use in IPython.

    Parameters
    ----------
    cmd : str or list
        A command to be executed in the system shell.

    Returns
    -------
    int : child process' exit code.
    """
    # The controller provides interactivity with both
    # stdin and stdout
    # import _process_win32_controller
    # _process_win32_controller.system(cmd)

    with AvoidUNCPath() as path:
        if path is not None:
            cmd = '"pushd %s &&"%s' % (path, cmd)
        res = process_handler(cmd, _system_body)
        assert isinstance(res, int | type(None))
        return res
    return None


def getoutput(cmd: str) -> str:
    """Return standard output of executing cmd in a shell.

    Accepts the same arguments as os.system().

    Parameters
    ----------
    cmd : str or list
        A command to be executed in the system shell.

    Returns
    -------
    stdout : str
    """

    with AvoidUNCPath() as path:
        if path is not None:
            cmd = '"pushd %s &&"%s' % (path, cmd)
        out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)

    if out is None:
        out = b""
    return py3compat.decode(out)


try:
    windll = ctypes.windll  # type: ignore [attr-defined]
    CommandLineToArgvW = windll.shell32.CommandLineToArgvW
    CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
    CommandLineToArgvW.restype = POINTER(LPCWSTR)
    LocalFree = windll.kernel32.LocalFree
    LocalFree.res_type = HLOCAL
    LocalFree.arg_types = [HLOCAL]

    def arg_split(
        commandline: str, posix: bool = False, strict: bool = True
    ) -> List[str]:
        """Split a command line's arguments in a shell-like manner.

        This is a special version for windows that use a ctypes call to CommandLineToArgvW
        to do the argv splitting. The posix parameter is ignored.

        If strict=False, process_common.arg_split(...strict=False) is used instead.
        """
        # CommandLineToArgvW returns path to executable if called with empty string.
        if commandline.strip() == "":
            return []
        if not strict:
            # not really a cl-arg, fallback on _process_common
            return py_arg_split(commandline, posix=posix, strict=strict)
        argvn = c_int()
        result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
        try:
            result_array_type = LPCWSTR * argvn.value
            result = [
                arg
                for arg in result_array_type.from_address(
                    ctypes.addressof(result_pointer.contents)
                )
                if arg is not None
            ]
        finally:
            # for side effects
            _ = LocalFree(result_pointer)
        return result
except AttributeError:
    arg_split = py_arg_split


def check_pid(pid: int) -> bool:
    # OpenProcess returns 0 if no such process (of ours) exists
    # positive int otherwise
    return bool(windll.kernel32.OpenProcess(1, 0, pid))