undetected-chromedriver/undetected_chromedriver/dprocess.py

75 lines
1.7 KiB
Python
Raw Permalink Normal View History

2022-11-28 15:47:38 -07:00
import atexit
import logging
2021-11-14 04:32:22 -07:00
import multiprocessing
import os
import platform
2022-11-28 15:47:38 -07:00
import signal
2021-11-14 04:32:22 -07:00
from subprocess import PIPE
from subprocess import Popen
2022-11-28 15:47:38 -07:00
import sys
2021-11-14 04:32:22 -07:00
CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008
REGISTERED = []
def start_detached(executable, *args):
"""
Starts a fully independent subprocess (with no parent)
:param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process
"""
# create pipe
reader, writer = multiprocessing.Pipe(False)
# do not keep reference
multiprocessing.Process(
target=_start_detached,
args=(executable, *args),
kwargs={"writer": writer},
daemon=True,
).start()
2021-11-14 04:32:22 -07:00
# receive pid from pipe
pid = reader.recv()
REGISTERED.append(pid)
# close pipes
writer.close()
reader.close()
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == "Windows":
2021-11-14 04:32:22 -07:00
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
elif sys.version_info < (3, 2):
# assume posix
kwargs.update(preexec_fn=os.setsid)
else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# send pid to pipe
writer.send(p.pid)
2021-11-17 01:38:01 -07:00
sys.exit()
2021-11-14 04:32:22 -07:00
def _cleanup():
for pid in REGISTERED:
try:
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
2021-11-14 04:32:22 -07:00
os.kill(pid, signal.SIGTERM)
except: # noqa
2021-11-16 10:43:52 -07:00
pass
2021-11-14 04:32:22 -07:00
atexit.register(_cleanup)