diff --git a/patcher.py b/patcher.py deleted file mode 100644 index 24cb3fa..0000000 --- a/patcher.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import io -import logging -import os -import random -import re -import string -import sys -import time -import zipfile -from distutils.version import LooseVersion -from urllib.request import urlopen, urlretrieve -import secrets - - -logger = logging.getLogger(__name__) - -IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) - - -class Patcher(object): - url_repo = "https://chromedriver.storage.googleapis.com" - zip_name = "chromedriver_%s.zip" - exe_name = "chromedriver%s" - - platform = sys.platform - if platform.endswith("win32"): - zip_name %= "win32" - exe_name %= ".exe" - if platform.endswith("linux"): - zip_name %= "linux64" - exe_name %= "" - if platform.endswith("darwin"): - zip_name %= "mac64" - exe_name %= "" - - if platform.endswith("win32"): - d = "~/appdata/roaming/undetected_chromedriver" - elif 'LAMBDA_TASK_ROOT' in os.environ: - d = "/tmp/undetected_chromedriver" - elif platform.startswith("linux"): - d = "~/.local/share/undetected_chromedriver" - elif platform.endswith("darwin"): - d = "~/Library/Application Support/undetected_chromedriver" - else: - d = "~/.undetected_chromedriver" - data_path = os.path.abspath(os.path.expanduser(d)) - - def __init__(self, executable_path=None, force=False, version_main: int = 0): - """ - - Args: - executable_path: None = automatic - a full file path to the chromedriver executable - force: False - terminate processes which are holding lock - version_main: 0 = auto - specify main chrome version (rounded, ex: 82) - """ - - self.force = force - self.executable_path = None - prefix = secrets.token_hex(8) - - if not os.path.exists(self.data_path): - os.makedirs(self.data_path, exist_ok=True) - - if not executable_path: - self.executable_path = os.path.join( - self.data_path, "_".join([prefix, self.exe_name]) - ) - - if not IS_POSIX: - if executable_path: - if not executable_path[-4:] == ".exe": - executable_path += ".exe" - - self.zip_path = os.path.join(self.data_path, prefix) - - if not executable_path: - self.executable_path = os.path.abspath( - os.path.join(".", self.executable_path) - ) - - self._custom_exe_path = False - - if executable_path: - self._custom_exe_path = True - self.executable_path = executable_path - self.version_main = version_main - self.version_full = None - - def auto(self, executable_path=None, force=False, version_main=None): - """""" - if executable_path: - self.executable_path = executable_path - self._custom_exe_path = True - - if self._custom_exe_path: - ispatched = self.is_binary_patched(self.executable_path) - if not ispatched: - return self.patch_exe() - else: - return - - if version_main: - self.version_main = version_main - if force is True: - self.force = force - - try: - os.unlink(self.executable_path) - except PermissionError: - if self.force: - self.force_kill_instances(self.executable_path) - return self.auto(force=not self.force) - try: - if self.is_binary_patched(): - # assumes already running AND patched - return True - except PermissionError: - pass - # return False - except FileNotFoundError: - pass - - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.unzip_package(self.fetch_package()) - return self.patch() - - def patch(self): - self.patch_exe() - return self.is_binary_patched() - - def fetch_release_number(self): - """ - Gets the latest major version available, or the latest major version of self.target_version if set explicitly. - :return: version string - :rtype: LooseVersion - """ - path = "/latest_release" - if self.version_main: - path += f"_{self.version_main}" - path = path.upper() - logger.debug("getting release number from %s" % path) - return LooseVersion(urlopen(self.url_repo + path).read().decode()) - - def parse_exe_version(self): - with io.open(self.executable_path, "rb") as f: - for line in iter(lambda: f.readline(), b""): - match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) - if match: - return LooseVersion(match[1].decode()) - - def fetch_package(self): - """ - Downloads ChromeDriver from source - - :return: path to downloaded file - """ - u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) - logger.debug("downloading from %s" % u) - # return urlretrieve(u, filename=self.data_path)[0] - return urlretrieve(u)[0] - - def unzip_package(self, fp): - """ - Does what it says - - :return: path to unpacked executable - """ - logger.debug("unzipping %s" % fp) - try: - os.unlink(self.zip_path) - except (FileNotFoundError, OSError): - pass - - os.makedirs(self.zip_path, mode=0o755, exist_ok=True) - with zipfile.ZipFile(fp, mode="r") as zf: - zf.extract(self.exe_name, self.zip_path) - os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) - os.remove(fp) - os.rmdir(self.zip_path) - os.chmod(self.executable_path, 0o755) - return self.executable_path - - @staticmethod - def force_kill_instances(exe_name): - """ - kills running instances. - :param: executable name to kill, may be a path as well - - :return: True on success else False - """ - exe_name = os.path.basename(exe_name) - if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % exe_name) - else: - r = os.system("taskkill /f /im %s" % exe_name) - return not r - - @staticmethod - def gen_random_cdc(): - cdc = random.choices(string.ascii_lowercase, k=26) - cdc[-6:-4] = map(str.upper, cdc[-6:-4]) - cdc[2] = cdc[0] - cdc[3] = "_" - return "".join(cdc).encode() - - def is_binary_patched(self, executable_path=None): - """simple check if executable is patched. - - :return: False if not patched, else True - """ - executable_path = executable_path or self.executable_path - with io.open(executable_path, "rb") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - return False - else: - return True - - def patch_exe(self): - """ - Patches the ChromeDriver binary - - :return: False on failure, binary name on success - """ - logger.info("patching driver executable %s" % self.executable_path) - - linect = 0 - replacement = self.gen_random_cdc() - with io.open(self.executable_path, "r+b") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - fh.seek(-len(line), 1) - newline = re.sub(b"cdc_.{22}", replacement, line) - fh.write(newline) - linect += 1 - return linect - - def __repr__(self): - return "{0:s}({1:s})".format( - self.__class__.__name__, - self.executable_path, - ) - - def __del__(self): - - if self._custom_exe_path: - # if the driver binary is specified by user - # we assume it is important enough to not delete it - return - else: - timeout = 3 # stop trying after this many seconds - t = time.monotonic() - while True: - now = time.monotonic() - if now - t > timeout: - # we don't want to wait until the end of time - logger.debug( - "could not unlink %s in time (%d seconds)" - % (self.executable_path, timeout) - ) - break - try: - os.unlink(self.executable_path) - logger.debug("successfully unlinked %s" % self.executable_path) - break - except (OSError, RuntimeError, PermissionError): - time.sleep(0.1) - continue - except FileNotFoundError: - break diff --git a/setup.py b/setup.py index bf688f1..b5a7ca9 100644 --- a/setup.py +++ b/setup.py @@ -60,4 +60,4 @@ setup( "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ], -) +) \ No newline at end of file