Spaces:
Running
Running
File size: 6,775 Bytes
658460c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
"""Windows-specific implementation of process utilities.
This file is only meant to be imported by process.py, not by end-users.
"""
import ctypes
import os
import subprocess
import sys
import time
from ctypes import POINTER, c_int
from ctypes.wintypes import HLOCAL, LPCWSTR
from subprocess import STDOUT
from threading import Thread
from types import TracebackType
from typing import List, Optional
from . import py3compat
from ._process_common import arg_split as py_arg_split
from ._process_common import process_handler, read_no_interrupt
from .encoding import DEFAULT_ENCODING
class AvoidUNCPath:
"""A context manager to protect command execution from UNC paths.
In the Win32 API, commands can't be invoked with the cwd being a UNC path.
This context manager temporarily changes directory to the 'C:' drive on
entering, and restores the original working directory on exit.
The context manager returns the starting working directory *if* it made a
change and None otherwise, so that users can apply the necessary adjustment
to their system calls in the event of a change.
Examples
--------
::
cmd = 'dir'
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
os.system(cmd)
"""
def __enter__(self) -> Optional[str]:
self.path = os.getcwd()
self.is_unc_path = self.path.startswith(r"\\")
if self.is_unc_path:
# change to c drive (as cmd.exe cannot handle UNC addresses)
os.chdir("C:")
return self.path
else:
# We return None to signal that there was no change in the working
# directory
return None
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: TracebackType,
) -> None:
if self.is_unc_path:
os.chdir(self.path)
def _system_body(p: subprocess.Popen) -> int:
"""Callback for _system."""
enc = DEFAULT_ENCODING
# Dec 2024: in both of these functions, I'm not sure why we .splitlines()
# the bytes and then decode each line individually instead of just decoding
# the whole thing at once.
def stdout_read() -> None:
try:
assert p.stdout is not None
for byte_line in read_no_interrupt(p.stdout).splitlines():
line = byte_line.decode(enc, "replace")
print(line, file=sys.stdout)
except Exception as e:
print(f"Error reading stdout: {e}", file=sys.stderr)
def stderr_read() -> None:
try:
assert p.stderr is not None
for byte_line in read_no_interrupt(p.stderr).splitlines():
line = byte_line.decode(enc, "replace")
print(line, file=sys.stderr)
except Exception as e:
print(f"Error reading stderr: {e}", file=sys.stderr)
stdout_thread = Thread(target=stdout_read)
stderr_thread = Thread(target=stderr_read)
stdout_thread.start()
stderr_thread.start()
# Wait to finish for returncode. Unfortunately, Python has a bug where
# wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
# a loop instead of just doing `return p.wait()`
while True:
result = p.poll()
if result is None:
time.sleep(0.01)
else:
break
# Join the threads to ensure they complete before returning
stdout_thread.join()
stderr_thread.join()
return result
def system(cmd: str) -> Optional[int]:
"""Win32 version of os.system() that works with network shares.
Note that this implementation returns None, as meant for use in IPython.
Parameters
----------
cmd : str or list
A command to be executed in the system shell.
Returns
-------
int : child process' exit code.
"""
# The controller provides interactivity with both
# stdin and stdout
# import _process_win32_controller
# _process_win32_controller.system(cmd)
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
res = process_handler(cmd, _system_body)
assert isinstance(res, int | type(None))
return res
return None
def getoutput(cmd: str) -> str:
"""Return standard output of executing cmd in a shell.
Accepts the same arguments as os.system().
Parameters
----------
cmd : str or list
A command to be executed in the system shell.
Returns
-------
stdout : str
"""
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
if out is None:
out = b""
return py3compat.decode(out)
try:
windll = ctypes.windll # type: ignore [attr-defined]
CommandLineToArgvW = windll.shell32.CommandLineToArgvW
CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
CommandLineToArgvW.restype = POINTER(LPCWSTR)
LocalFree = windll.kernel32.LocalFree
LocalFree.res_type = HLOCAL
LocalFree.arg_types = [HLOCAL]
def arg_split(
commandline: str, posix: bool = False, strict: bool = True
) -> List[str]:
"""Split a command line's arguments in a shell-like manner.
This is a special version for windows that use a ctypes call to CommandLineToArgvW
to do the argv splitting. The posix parameter is ignored.
If strict=False, process_common.arg_split(...strict=False) is used instead.
"""
# CommandLineToArgvW returns path to executable if called with empty string.
if commandline.strip() == "":
return []
if not strict:
# not really a cl-arg, fallback on _process_common
return py_arg_split(commandline, posix=posix, strict=strict)
argvn = c_int()
result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
try:
result_array_type = LPCWSTR * argvn.value
result = [
arg
for arg in result_array_type.from_address(
ctypes.addressof(result_pointer.contents)
)
if arg is not None
]
finally:
# for side effects
_ = LocalFree(result_pointer)
return result
except AttributeError:
arg_split = py_arg_split
def check_pid(pid: int) -> bool:
# OpenProcess returns 0 if no such process (of ours) exists
# positive int otherwise
return bool(windll.kernel32.OpenProcess(1, 0, pid))
|