From e4e373543fd8a0c9da42312231b209303552dfe5 Mon Sep 17 00:00:00 2001 From: ultrafunkamsterdam Date: Tue, 27 Apr 2021 20:05:34 +0200 Subject: [PATCH] fixed a number of bugs - specifying custom profile - specifying custom binary path - downloading, patching and storing now (if not explicity specified) happens in a writable folder, instead of the current working dir. Committer: UltrafunkAmsterdam --- setup.py | 21 +- tests/test_undetected_chromedriver.py | 36 -- undetected_chromedriver/__init__.py | 2 +- .../tests/test_undetected_chromedriver.py | 55 +++ undetected_chromedriver/v2.py | 329 ++++++++++++------ 5 files changed, 289 insertions(+), 154 deletions(-) delete mode 100644 tests/test_undetected_chromedriver.py create mode 100644 undetected_chromedriver/tests/test_undetected_chromedriver.py diff --git a/setup.py b/setup.py index 6ce07e4..38230e4 100644 --- a/setup.py +++ b/setup.py @@ -12,13 +12,29 @@ Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y BY ULTRAFUNKAMSTERDAM (https://github.com/ultrafunkamsterdam)""" from setuptools import setup +import os +import re + +with open(os.path.join(os.path.abspath( + os.path.dirname(__file__)), + 'undetected_chromedriver', + '__init__.py'), + mode='r', + encoding='latin1') as fp: + try: + version = re.findall(r"^__version__ = '([^']+)'\r?$", + fp.read(), re.M)[0] + except Exception: + raise RuntimeError("unable to determine version") setup( name="undetected-chromedriver", - version="2.2.1", + version=version, packages=["undetected_chromedriver"], - install_requires=["selenium",], + install_requires=[ + "selenium", + ], url="https://github.com/ultrafunkamsterdam/undetected-chromedriver", license="GPL-3.0", author="UltrafunkAmsterdam", @@ -37,4 +53,3 @@ setup( "Programming Language :: Python :: 3.7", ], ) - diff --git a/tests/test_undetected_chromedriver.py b/tests/test_undetected_chromedriver.py deleted file mode 100644 index c78f97b..0000000 --- a/tests/test_undetected_chromedriver.py +++ /dev/null @@ -1,36 +0,0 @@ -import sys -import os - - -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -import time # noqa - - -def test_undetected_chromedriver(): - - import undetected_chromedriver.v2 as uc - driver = uc.Chrome() - - with driver: - driver.get("https://coinfaucet.eu") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("coinfaucet.eu.png") - - with driver: - driver.get("https://cia.gov") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("cia.gov.png") - - with driver: - driver.get("https://lhcdn.botprotect.io") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("notprotect.io.png") - - with driver: - driver.get("https://www.datadome.co") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("datadome.co.png") - - -test_undetected_chromedriver() - diff --git a/undetected_chromedriver/__init__.py b/undetected_chromedriver/__init__.py index 362a143..877e551 100644 --- a/undetected_chromedriver/__init__.py +++ b/undetected_chromedriver/__init__.py @@ -31,7 +31,7 @@ from selenium.webdriver import Chrome as _Chrome from selenium.webdriver import ChromeOptions as _ChromeOptions logger = logging.getLogger(__name__) - +__version__ = "2.2.6" TARGET_VERSION = 0 diff --git a/undetected_chromedriver/tests/test_undetected_chromedriver.py b/undetected_chromedriver/tests/test_undetected_chromedriver.py new file mode 100644 index 0000000..24984ca --- /dev/null +++ b/undetected_chromedriver/tests/test_undetected_chromedriver.py @@ -0,0 +1,55 @@ +import logging +import os +import sys +import time # noqa + +from ..v2 import * + +logging.basicConfig(level=10) + +logger = logging.getLogger('TEST') +logger.setLevel(20) + + +def test_quick(): + import undetected_chromedriver.v2 as uc + + print('uc module: ', uc) + # options = selenium.webdriver.ChromeOptions() + options = uc.ChromeOptions() + + options.add_argument('--user-data-dir=c:\\temp') + options.binary_location = uc.find_chrome_executable() + driver = uc.Chrome(executable_path='./chromedriver.exe', options=options, + service_log_path='c:\\temp\\service.log.txt') + while True: + sys.stdin.read() + + +def test_undetected_chromedriver(): + import undetected_chromedriver.v2 as uc + + driver = uc.Chrome() + + with driver: + driver.get("https://coinfaucet.eu") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("coinfaucet.eu.png") + + with driver: + driver.get("https://cia.gov") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("cia.gov.png") + + with driver: + driver.get("https://lhcdn.botprotect.io") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("notprotect.io.png") + + with driver: + driver.get("https://www.datadome.co") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("datadome.co.png") + +# test_quick() +# #test_undetected_chromedriver() diff --git a/undetected_chromedriver/v2.py b/undetected_chromedriver/v2.py index dd91af6..91feaf4 100644 --- a/undetected_chromedriver/v2.py +++ b/undetected_chromedriver/v2.py @@ -31,7 +31,6 @@ whats new: """ - from __future__ import annotations import io @@ -44,14 +43,11 @@ import string import subprocess import sys import tempfile -import threading import time import zipfile -import atexit -import contextlib from distutils.version import LooseVersion from urllib.request import urlopen, urlretrieve - +from selenium.webdriver.chrome.options import Options as _ChromeOptions import selenium.webdriver.chrome.service import selenium.webdriver.chrome.webdriver import selenium.webdriver.common.service @@ -62,6 +58,42 @@ __all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable") IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) logger = logging.getLogger("uc") +logger.setLevel(logging.getLogger().getEffectiveLevel()) + + +# +# def get_driver(user_data_dir=None, keep_profile=False, verbose=True, headless=False): +# """ +# +# Args: +# executable_path: +# profile_path: +# keep_profile: +# verbose: +# headless: +# +# Returns: +# +# """ +# log_level = 0 +# +# opts = ChromeOptions() +# if user_data_dir: +# opts.add_argument('--user-data-dir=%s' % user_data_dir) +# +# if headless: +# opts.headless = True +# +# if verbose: +# logging.basicConfig(level=10) +# logger.setLevel(10) +# service_log_path = 'chrome.verbose.log' +# +# else: +# service_log_path = None +# +# return Chrome(options=opts, log_level=log_level, service_log_path=service_log_path, keep_profile=keep_profile) + def find_chrome_executable(): @@ -97,17 +129,16 @@ def find_chrome_executable(): class Chrome(object): - __doc__ = ( """\ - -------------------------------------------------------------------------- - NOTE: - Chrome has everything included to work out of the box. - it does not `need` customizations. - any customizations MAY lead to trigger bot migitation systems. - - -------------------------------------------------------------------------- - """ + -------------------------------------------------------------------------- + NOTE: + Chrome has everything included to work out of the box. + it does not `need` customizations. + any customizations MAY lead to trigger bot migitation systems. + + -------------------------------------------------------------------------- + """ + selenium.webdriver.remote.webdriver.WebDriver.__doc__ ) @@ -123,15 +154,16 @@ class Chrome(object): service_log_path=None, chrome_options=None, keep_alive=True, + keep_profile=None, debug_addr=None, - user_data_dir=None, + log_level=0, factor=1, delay=2, emulate_touch=False, - ): + ): - p = Patcher(target_path=executable_path) - p.auto(False) + p = Patcher.auto(executable_path=executable_path) + # p.auto(False) self._patcher = p self.factor = factor @@ -141,6 +173,7 @@ class Chrome(object): self.browser_args = None self._rcount = 0 self._rdiff = 10 + self.keep_profile = keep_profile try: dbg = debug_addr.split(":") @@ -152,9 +185,6 @@ class Chrome(object): if not debug_addr: debug_addr = f"{debug_host}:{debug_port}" - if not user_data_dir: - user_data_dir = os.path.normpath(tempfile.mkdtemp()) - if not options: options = selenium.webdriver.chrome.webdriver.Options() @@ -167,9 +197,28 @@ class Chrome(object): if not desired_capabilities: desired_capabilities = options.to_capabilities() - self.options = options + + user_data_dir = None + + for arg in options.arguments: + if 'user-data-dir' in arg: + m = re.search('(?:--)?user-data-dir(?:[ =])?(.*)', arg) + try: + user_data_dir = m[1] + logger.debug('user-data-dir found in user argument %s => %s' % (arg, m[1])) + break + except IndexError: + logger.debug('no user data dir could be extracted from supplied argument %s ' % arg) + else: + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + arg = '--user-data-dir=%s' % user_data_dir + options.add_argument(arg) + logger.debug('created a temporary folder in which the user-data (profile) will be stored during this\n' + 'session, and added it to chrome startup arguments: %s' % arg) self.user_data_dir = user_data_dir + self.options = options + extra_args = options.arguments if options.headless: @@ -178,10 +227,9 @@ class Chrome(object): self.browser_args = [ options.binary_location, - "--user-data-dir=%s" % user_data_dir, "--remote-debugging-host=%s" % debug_host, "--remote-debugging-port=%s" % debug_port, - "--log-level=%d" % divmod(logging.getLogger().getEffectiveLevel(), 10)[0], + "--log-level=%d" % log_level or divmod(logging.getLogger().getEffectiveLevel(), 10)[0], *extra_args, ] @@ -194,7 +242,7 @@ class Chrome(object): ) self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver( - executable_path=p.target_path, + # executable_path=p.executable_path, port=port, options=options, service_args=service_args, @@ -336,13 +384,18 @@ class Chrome(object): logger.debug(e, exc_info=True) except Exception: # noqa pass - try: - logger.debug("removing profile : %s" % self.user_data_dir) - shutil.rmtree(self.user_data_dir, ignore_errors=False) - except PermissionError: - logger.debug("permission error. files are still in use/locked. retying...") - time.sleep(1) - self.quit() + if not self.keep_profile or self.keep_profile is False: + for _ in range(3): + try: + logger.debug("removing profile : %s" % self.user_data_dir) + shutil.rmtree(self.user_data_dir, ignore_errors=False) + except FileNotFoundError: + pass + except PermissionError: + logger.debug("permission error. files are still in use/locked. retying...") + else: + break + time.sleep(1) def __del__(self): self.quit() @@ -361,50 +414,102 @@ class Chrome(object): return hash(self.options.debugger_address) -class Patcher(object): - url_repo = "https://chromedriver.storage.googleapis.com" - def __init__( - self, target_path="./chromedriver", force=False, version_main: int = 0 - ): - if not IS_POSIX: - if not target_path[-4:] == ".exe": - target_path += ".exe" +class Patcher(object): + + url_repo = "https://chromedriver.storage.googleapis.com" + zip_name = "chromedriver_%s.zip" + exe_name = "chromedriver%s" + + platform = sys.platform + if platform.endswith("win32"): + zip_name %= "win32" + exe_name %= ".exe" + if platform.endswith("linux"): + zip_name %= "linux64" + exe_name %= "" + if platform.endswith("darwin"): + zip_name %= "mac64" + exe_name %= "" + + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif platform.startswith("linux"): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + + + def __init__(self, executable_path=None, force=False, version_main: int = 0): + """ + + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ self.force = force - z, e = self.get_package_name() - if not target_path: - target_path = e - self.exename = e - self.target_path = target_path - self.zipname = z + if not executable_path: + executable_path = os.path.join(self.data_path, self.exe_name) + + if not IS_POSIX: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join( + self.data_path, self.zip_name) + + self.executable_path = os.path.abspath(os.path.join('.', executable_path)) + self.version_main = version_main self.version_full = None - def auto(self, force=False): + @classmethod + def auto(cls, executable_path='./chromedriver', force=False): + """ + + Args: + force: + + Returns: + + """ + i = cls(executable_path, force=force) try: - os.unlink(self.target_path) + os.unlink(i.executable_path) except PermissionError: - - if force or self.force: - self.force_kill_instances() - return self.auto() - - if self.verify_patch(): - # assumes already running AND patched - return True - return False + if i.force: + cls.force_kill_instances(i.executable_path) + return i.auto(force=False) + try: + if i.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False except FileNotFoundError: pass - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.fetch_package() - self.unzip_package() + release = i.fetch_release_number() + i.version_main = release.version[0] + i.version_full = release + i.unzip_package(i.fetch_package()) + i.patch() + return i + + def patch(self): self.patch_exe() - return self.verify_patch() + return self.is_binary_patched() def fetch_release_number(self): """ @@ -420,7 +525,7 @@ class Patcher(object): return LooseVersion(urlopen(self.url_repo + path).read().decode()) def parse_exe_version(self): - with io.open(self.target_path, "rb") as f: + with io.open(self.executable_path, "rb") as f: for line in iter(lambda: f.readline(), b""): match = re.search(br"platform_handle\x00content\x00([0-9\.]*)", line) if match: @@ -432,61 +537,49 @@ class Patcher(object): :return: path to downloaded file """ - u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zipname) + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) logger.debug("downloading from %s" % u) - zp, *_ = urlretrieve(u, filename=self.zipname) - return zp + # return urlretrieve(u, filename=self.data_path)[0] + return urlretrieve(u)[0] - def unzip_package(self): + def unzip_package(self, fp): """ Does what it says :return: path to unpacked executable """ - logger.debug("unzipping %s" % self.zipname) + logger.debug("unzipping %s" % fp) try: - os.makedirs(os.path.dirname(self.target_path), mode=0o755) - except OSError: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): pass - with zipfile.ZipFile(self.zipname, mode="r") as zf: - zf.extract(self.exename) - os.rename(self.exename, self.target_path) - os.remove(self.zipname) - os.chmod(self.target_path, 0o755) - return self.target_path + + os.makedirs( + self.data_path, + mode=0o755, + exist_ok=True) + + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extract(self.exe_name, os.path.dirname(self.executable_path)) + # os.rename(self.zip_path, self.executable_path) + os.remove(fp) + + os.chmod(self.executable_path, 0o755) + return self.executable_path @staticmethod - def get_package_name(): - """ - returns a tuple of (zipname, exename) depending on platform. - - :return: (zipname, exename) - """ - zipname = "chromedriver_%s.zip" - exe = "chromedriver%s" - platform = sys.platform - if platform.endswith("win32"): - zipname %= "win32" - exe %= ".exe" - if platform.endswith("linux"): - zipname %= "linux64" - exe %= "" - if platform.endswith("darwin"): - zipname %= "mac64" - exe %= "" - return zipname, exe - - def force_kill_instances(self): + def force_kill_instances(exe_name): """ kills running instances. :param self: :return: True on success else False """ + exe_name = os.path.basename(exe_name) if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % self.exename) + r = os.system("kill -f -9 $(pidof %s)" % exe_name) else: - r = os.system("taskkill /f /im %s" % self.exename) + r = os.system("taskkill /f /im %s" % exe_name) return not r @staticmethod @@ -497,19 +590,19 @@ class Patcher(object): cdc[3] = "_" return "".join(cdc).encode() - def verify_patch(self): + def is_binary_patched(self, executable_path=None): """simple check if executable is patched. :return: False if not patched, else True """ - try: - with io.open(self.target_path, "rb") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - return False - return True - except FileNotFoundError: - return False + executable_path = executable_path or self.executable_path + with io.open(executable_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + else: + return True + def patch_exe(self): """ @@ -517,12 +610,11 @@ class Patcher(object): :return: False on failure, binary name on success """ - - logger.info("patching driver executable %s" % self.target_path) + logger.info("patching driver executable %s" % self.executable_path) linect = 0 replacement = self.gen_random_cdc() - with io.open(self.target_path, "r+b") as fh: + with io.open(self.executable_path, "r+b") as fh: for line in iter(lambda: fh.readline(), b""): if b"cdc_" in line: fh.seek(-len(line), 1) @@ -532,5 +624,14 @@ class Patcher(object): return linect -class ChromeOptions(selenium.webdriver.chrome.webdriver.Options): - pass +# class ChromeOptions(selenium.webdriver.chrome.webdriver.Options): +class ChromeOptions(_ChromeOptions): + + def add_extension_file_crx(self, extension=None): + + if extension: + extension_to_add = os.path.abspath(os.path.expanduser(extension)) + logger.debug('extension_to_add: %s' % extension_to_add) + + return super().add_extension(r'%s' % extension) +