path for aws lambda
This commit is contained in:
parent
b2e804e977
commit
f386a5b743
|
@ -0,0 +1,278 @@
|
|||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
import zipfile
|
||||
from distutils.version import LooseVersion
|
||||
from urllib.request import urlopen, urlretrieve
|
||||
import secrets
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux"))
|
||||
|
||||
|
||||
class Patcher(object):
|
||||
url_repo = "https://chromedriver.storage.googleapis.com"
|
||||
zip_name = "chromedriver_%s.zip"
|
||||
exe_name = "chromedriver%s"
|
||||
|
||||
platform = sys.platform
|
||||
if platform.endswith("win32"):
|
||||
zip_name %= "win32"
|
||||
exe_name %= ".exe"
|
||||
if platform.endswith("linux"):
|
||||
zip_name %= "linux64"
|
||||
exe_name %= ""
|
||||
if platform.endswith("darwin"):
|
||||
zip_name %= "mac64"
|
||||
exe_name %= ""
|
||||
|
||||
if platform.endswith("win32"):
|
||||
d = "~/appdata/roaming/undetected_chromedriver"
|
||||
elif 'LAMBDA_TASK_ROOT' in os.environ:
|
||||
d = "/tmp/undetected_chromedriver"
|
||||
elif platform.startswith("linux"):
|
||||
d = "~/.local/share/undetected_chromedriver"
|
||||
elif platform.endswith("darwin"):
|
||||
d = "~/Library/Application Support/undetected_chromedriver"
|
||||
else:
|
||||
d = "~/.undetected_chromedriver"
|
||||
data_path = os.path.abspath(os.path.expanduser(d))
|
||||
|
||||
def __init__(self, executable_path=None, force=False, version_main: int = 0):
|
||||
"""
|
||||
|
||||
Args:
|
||||
executable_path: None = automatic
|
||||
a full file path to the chromedriver executable
|
||||
force: False
|
||||
terminate processes which are holding lock
|
||||
version_main: 0 = auto
|
||||
specify main chrome version (rounded, ex: 82)
|
||||
"""
|
||||
|
||||
self.force = force
|
||||
self.executable_path = None
|
||||
prefix = secrets.token_hex(8)
|
||||
|
||||
if not os.path.exists(self.data_path):
|
||||
os.makedirs(self.data_path, exist_ok=True)
|
||||
|
||||
if not executable_path:
|
||||
self.executable_path = os.path.join(
|
||||
self.data_path, "_".join([prefix, self.exe_name])
|
||||
)
|
||||
|
||||
if not IS_POSIX:
|
||||
if executable_path:
|
||||
if not executable_path[-4:] == ".exe":
|
||||
executable_path += ".exe"
|
||||
|
||||
self.zip_path = os.path.join(self.data_path, prefix)
|
||||
|
||||
if not executable_path:
|
||||
self.executable_path = os.path.abspath(
|
||||
os.path.join(".", self.executable_path)
|
||||
)
|
||||
|
||||
self._custom_exe_path = False
|
||||
|
||||
if executable_path:
|
||||
self._custom_exe_path = True
|
||||
self.executable_path = executable_path
|
||||
self.version_main = version_main
|
||||
self.version_full = None
|
||||
|
||||
def auto(self, executable_path=None, force=False, version_main=None):
|
||||
""""""
|
||||
if executable_path:
|
||||
self.executable_path = executable_path
|
||||
self._custom_exe_path = True
|
||||
|
||||
if self._custom_exe_path:
|
||||
ispatched = self.is_binary_patched(self.executable_path)
|
||||
if not ispatched:
|
||||
return self.patch_exe()
|
||||
else:
|
||||
return
|
||||
|
||||
if version_main:
|
||||
self.version_main = version_main
|
||||
if force is True:
|
||||
self.force = force
|
||||
|
||||
try:
|
||||
os.unlink(self.executable_path)
|
||||
except PermissionError:
|
||||
if self.force:
|
||||
self.force_kill_instances(self.executable_path)
|
||||
return self.auto(force=not self.force)
|
||||
try:
|
||||
if self.is_binary_patched():
|
||||
# assumes already running AND patched
|
||||
return True
|
||||
except PermissionError:
|
||||
pass
|
||||
# return False
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
release = self.fetch_release_number()
|
||||
self.version_main = release.version[0]
|
||||
self.version_full = release
|
||||
self.unzip_package(self.fetch_package())
|
||||
return self.patch()
|
||||
|
||||
def patch(self):
|
||||
self.patch_exe()
|
||||
return self.is_binary_patched()
|
||||
|
||||
def fetch_release_number(self):
|
||||
"""
|
||||
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
|
||||
:return: version string
|
||||
:rtype: LooseVersion
|
||||
"""
|
||||
path = "/latest_release"
|
||||
if self.version_main:
|
||||
path += f"_{self.version_main}"
|
||||
path = path.upper()
|
||||
logger.debug("getting release number from %s" % path)
|
||||
return LooseVersion(urlopen(self.url_repo + path).read().decode())
|
||||
|
||||
def parse_exe_version(self):
|
||||
with io.open(self.executable_path, "rb") as f:
|
||||
for line in iter(lambda: f.readline(), b""):
|
||||
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
|
||||
if match:
|
||||
return LooseVersion(match[1].decode())
|
||||
|
||||
def fetch_package(self):
|
||||
"""
|
||||
Downloads ChromeDriver from source
|
||||
|
||||
:return: path to downloaded file
|
||||
"""
|
||||
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
|
||||
logger.debug("downloading from %s" % u)
|
||||
# return urlretrieve(u, filename=self.data_path)[0]
|
||||
return urlretrieve(u)[0]
|
||||
|
||||
def unzip_package(self, fp):
|
||||
"""
|
||||
Does what it says
|
||||
|
||||
:return: path to unpacked executable
|
||||
"""
|
||||
logger.debug("unzipping %s" % fp)
|
||||
try:
|
||||
os.unlink(self.zip_path)
|
||||
except (FileNotFoundError, OSError):
|
||||
pass
|
||||
|
||||
os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
|
||||
with zipfile.ZipFile(fp, mode="r") as zf:
|
||||
zf.extract(self.exe_name, self.zip_path)
|
||||
os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path)
|
||||
os.remove(fp)
|
||||
os.rmdir(self.zip_path)
|
||||
os.chmod(self.executable_path, 0o755)
|
||||
return self.executable_path
|
||||
|
||||
@staticmethod
|
||||
def force_kill_instances(exe_name):
|
||||
"""
|
||||
kills running instances.
|
||||
:param: executable name to kill, may be a path as well
|
||||
|
||||
:return: True on success else False
|
||||
"""
|
||||
exe_name = os.path.basename(exe_name)
|
||||
if IS_POSIX:
|
||||
r = os.system("kill -f -9 $(pidof %s)" % exe_name)
|
||||
else:
|
||||
r = os.system("taskkill /f /im %s" % exe_name)
|
||||
return not r
|
||||
|
||||
@staticmethod
|
||||
def gen_random_cdc():
|
||||
cdc = random.choices(string.ascii_lowercase, k=26)
|
||||
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
|
||||
cdc[2] = cdc[0]
|
||||
cdc[3] = "_"
|
||||
return "".join(cdc).encode()
|
||||
|
||||
def is_binary_patched(self, executable_path=None):
|
||||
"""simple check if executable is patched.
|
||||
|
||||
:return: False if not patched, else True
|
||||
"""
|
||||
executable_path = executable_path or self.executable_path
|
||||
with io.open(executable_path, "rb") as fh:
|
||||
for line in iter(lambda: fh.readline(), b""):
|
||||
if b"cdc_" in line:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def patch_exe(self):
|
||||
"""
|
||||
Patches the ChromeDriver binary
|
||||
|
||||
:return: False on failure, binary name on success
|
||||
"""
|
||||
logger.info("patching driver executable %s" % self.executable_path)
|
||||
|
||||
linect = 0
|
||||
replacement = self.gen_random_cdc()
|
||||
with io.open(self.executable_path, "r+b") as fh:
|
||||
for line in iter(lambda: fh.readline(), b""):
|
||||
if b"cdc_" in line:
|
||||
fh.seek(-len(line), 1)
|
||||
newline = re.sub(b"cdc_.{22}", replacement, line)
|
||||
fh.write(newline)
|
||||
linect += 1
|
||||
return linect
|
||||
|
||||
def __repr__(self):
|
||||
return "{0:s}({1:s})".format(
|
||||
self.__class__.__name__,
|
||||
self.executable_path,
|
||||
)
|
||||
|
||||
def __del__(self):
|
||||
|
||||
if self._custom_exe_path:
|
||||
# if the driver binary is specified by user
|
||||
# we assume it is important enough to not delete it
|
||||
return
|
||||
else:
|
||||
timeout = 3 # stop trying after this many seconds
|
||||
t = time.monotonic()
|
||||
while True:
|
||||
now = time.monotonic()
|
||||
if now - t > timeout:
|
||||
# we don't want to wait until the end of time
|
||||
logger.debug(
|
||||
"could not unlink %s in time (%d seconds)"
|
||||
% (self.executable_path, timeout)
|
||||
)
|
||||
break
|
||||
try:
|
||||
os.unlink(self.executable_path)
|
||||
logger.debug("successfully unlinked %s" % self.executable_path)
|
||||
break
|
||||
except (OSError, RuntimeError, PermissionError):
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
break
|
Loading…
Reference in New Issue