From f386a5b7431d6f50e90b0cf09c54c43bbdb1b277 Mon Sep 17 00:00:00 2001 From: AktanKasymaliev Date: Mon, 23 May 2022 22:36:31 +0600 Subject: [PATCH] path for aws lambda --- patcher.py | 278 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 patcher.py diff --git a/patcher.py b/patcher.py new file mode 100644 index 0000000..24cb3fa --- /dev/null +++ b/patcher.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +import io +import logging +import os +import random +import re +import string +import sys +import time +import zipfile +from distutils.version import LooseVersion +from urllib.request import urlopen, urlretrieve +import secrets + + +logger = logging.getLogger(__name__) + +IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) + + +class Patcher(object): + url_repo = "https://chromedriver.storage.googleapis.com" + zip_name = "chromedriver_%s.zip" + exe_name = "chromedriver%s" + + platform = sys.platform + if platform.endswith("win32"): + zip_name %= "win32" + exe_name %= ".exe" + if platform.endswith("linux"): + zip_name %= "linux64" + exe_name %= "" + if platform.endswith("darwin"): + zip_name %= "mac64" + exe_name %= "" + + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif 'LAMBDA_TASK_ROOT' in os.environ: + d = "/tmp/undetected_chromedriver" + elif platform.startswith("linux"): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + def __init__(self, executable_path=None, force=False, version_main: int = 0): + """ + + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ + + self.force = force + self.executable_path = None + prefix = secrets.token_hex(8) + + if not os.path.exists(self.data_path): + os.makedirs(self.data_path, exist_ok=True) + + if not executable_path: + self.executable_path = os.path.join( + self.data_path, "_".join([prefix, self.exe_name]) + ) + + if not IS_POSIX: + if executable_path: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join(self.data_path, prefix) + + if not executable_path: + self.executable_path = os.path.abspath( + os.path.join(".", self.executable_path) + ) + + self._custom_exe_path = False + + if executable_path: + self._custom_exe_path = True + self.executable_path = executable_path + self.version_main = version_main + self.version_full = None + + def auto(self, executable_path=None, force=False, version_main=None): + """""" + if executable_path: + self.executable_path = executable_path + self._custom_exe_path = True + + if self._custom_exe_path: + ispatched = self.is_binary_patched(self.executable_path) + if not ispatched: + return self.patch_exe() + else: + return + + if version_main: + self.version_main = version_main + if force is True: + self.force = force + + try: + os.unlink(self.executable_path) + except PermissionError: + if self.force: + self.force_kill_instances(self.executable_path) + return self.auto(force=not self.force) + try: + if self.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False + except FileNotFoundError: + pass + + release = self.fetch_release_number() + self.version_main = release.version[0] + self.version_full = release + self.unzip_package(self.fetch_package()) + return self.patch() + + def patch(self): + self.patch_exe() + return self.is_binary_patched() + + def fetch_release_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + :return: version string + :rtype: LooseVersion + """ + path = "/latest_release" + if self.version_main: + path += f"_{self.version_main}" + path = path.upper() + logger.debug("getting release number from %s" % path) + return LooseVersion(urlopen(self.url_repo + path).read().decode()) + + def parse_exe_version(self): + with io.open(self.executable_path, "rb") as f: + for line in iter(lambda: f.readline(), b""): + match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) + if match: + return LooseVersion(match[1].decode()) + + def fetch_package(self): + """ + Downloads ChromeDriver from source + + :return: path to downloaded file + """ + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) + logger.debug("downloading from %s" % u) + # return urlretrieve(u, filename=self.data_path)[0] + return urlretrieve(u)[0] + + def unzip_package(self, fp): + """ + Does what it says + + :return: path to unpacked executable + """ + logger.debug("unzipping %s" % fp) + try: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): + pass + + os.makedirs(self.zip_path, mode=0o755, exist_ok=True) + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extract(self.exe_name, self.zip_path) + os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) + os.remove(fp) + os.rmdir(self.zip_path) + os.chmod(self.executable_path, 0o755) + return self.executable_path + + @staticmethod + def force_kill_instances(exe_name): + """ + kills running instances. + :param: executable name to kill, may be a path as well + + :return: True on success else False + """ + exe_name = os.path.basename(exe_name) + if IS_POSIX: + r = os.system("kill -f -9 $(pidof %s)" % exe_name) + else: + r = os.system("taskkill /f /im %s" % exe_name) + return not r + + @staticmethod + def gen_random_cdc(): + cdc = random.choices(string.ascii_lowercase, k=26) + cdc[-6:-4] = map(str.upper, cdc[-6:-4]) + cdc[2] = cdc[0] + cdc[3] = "_" + return "".join(cdc).encode() + + def is_binary_patched(self, executable_path=None): + """simple check if executable is patched. + + :return: False if not patched, else True + """ + executable_path = executable_path or self.executable_path + with io.open(executable_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + else: + return True + + def patch_exe(self): + """ + Patches the ChromeDriver binary + + :return: False on failure, binary name on success + """ + logger.info("patching driver executable %s" % self.executable_path) + + linect = 0 + replacement = self.gen_random_cdc() + with io.open(self.executable_path, "r+b") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + fh.seek(-len(line), 1) + newline = re.sub(b"cdc_.{22}", replacement, line) + fh.write(newline) + linect += 1 + return linect + + def __repr__(self): + return "{0:s}({1:s})".format( + self.__class__.__name__, + self.executable_path, + ) + + def __del__(self): + + if self._custom_exe_path: + # if the driver binary is specified by user + # we assume it is important enough to not delete it + return + else: + timeout = 3 # stop trying after this many seconds + t = time.monotonic() + while True: + now = time.monotonic() + if now - t > timeout: + # we don't want to wait until the end of time + logger.debug( + "could not unlink %s in time (%d seconds)" + % (self.executable_path, timeout) + ) + break + try: + os.unlink(self.executable_path) + logger.debug("successfully unlinked %s" % self.executable_path) + break + except (OSError, RuntimeError, PermissionError): + time.sleep(0.1) + continue + except FileNotFoundError: + break