diff --git a/README.md b/README.md index 095fdcf..a0054b1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# undetected_chromedriver +# undetected_chromedriver # https://github.com/ultrafunkamsterdam/undetected-chromedriver @@ -45,6 +45,7 @@ uc.install() from selenium.webdriver import Chrome driver = Chrome() driver.get('https://distilnetworks.com') + ``` diff --git a/setup.py b/setup.py index 9a3faa1..4ed2a93 100644 --- a/setup.py +++ b/setup.py @@ -16,21 +16,19 @@ from setuptools import setup setup( name="undetected-chromedriver", - version="1.5.2", + version="2.0.2", packages=["undetected_chromedriver"], install_requires=["selenium",], url="https://github.com/ultrafunkamsterdam/undetected-chromedriver", license="GPL-3.0", author="UltrafunkAmsterdam", author_email="info@blackhat-security.nl", - description=""" - Optimized Selenium/Chromedriver drop-in replacement for selenium.webdriver which does not trigger anti-bot services like Distil / CloudFlare / Imperva / DataDome / Botprotect.io and such. - All required anti-detection settings are built-in and ready to use, yet overridable if you\'d really want. - - Please note: results may vary, and depend on a lot of factors like settings, network, plugins, modus operandi. - No guarantees of any kind are given, yet I can guarantee ongoing and tenacious efforts evading and handling detection algorithms. - - For more information check out the README.""", + description="""\ + selenium.webdriver.Chrome replacement with focus on stealth. + not triggered by Distil / CloudFlare / Imperva / DataDome / hCaptcha and such. + + NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms. + """, long_description=open("README.md").read(), long_description_content_type="text/markdown", classifiers=[ @@ -39,3 +37,4 @@ setup( "Programming Language :: Python :: 3.7", ], ) + diff --git a/tests/test_undetected_chromedriver.py b/tests/test_undetected_chromedriver.py new file mode 100644 index 0000000..c78f97b --- /dev/null +++ b/tests/test_undetected_chromedriver.py @@ -0,0 +1,36 @@ +import sys +import os + + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +import time # noqa + + +def test_undetected_chromedriver(): + + import undetected_chromedriver.v2 as uc + driver = uc.Chrome() + + with driver: + driver.get("https://coinfaucet.eu") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("coinfaucet.eu.png") + + with driver: + driver.get("https://cia.gov") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("cia.gov.png") + + with driver: + driver.get("https://lhcdn.botprotect.io") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("notprotect.io.png") + + with driver: + driver.get("https://www.datadome.co") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("datadome.co.png") + + +test_undetected_chromedriver() + diff --git a/undetected_chromedriver/v2.py b/undetected_chromedriver/v2.py new file mode 100644 index 0000000..131c0da --- /dev/null +++ b/undetected_chromedriver/v2.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +""" +V2 beta + +whats new: + + - currently this v2 module will be available as option. + to use it / test it, you need to alter your imports by appending .v2 + + - headless mode not (yet) supported in v2 + + example: + + ```python + import undetected_chromedriver.v2 as uc + driver = uc.Chrome() + driver.get('https://somewebsite.xyz') + + # if site is protected by hCaptcha/Cloudflare + driver.get_in('https://cloudflareprotectedsite.xyz') + + # if site is protected by hCaptcha/Cloudflare + # (different syntax, same function) + with driver: + driver.get('https://cloudflareprotectedsite.xyz') + ``` + + tests/example in ../tests/test_undetected_chromedriver.py + +""" + +from __future__ import annotations + +import io +import logging +import os +import random +import re +import shutil +import string +import subprocess +import sys +import tempfile +import threading +import time +import zipfile +import atexit +import contextlib +from distutils.version import LooseVersion +from urllib.request import urlopen, urlretrieve + +import selenium.webdriver.chrome.service +import selenium.webdriver.chrome.webdriver +import selenium.webdriver.common.service +import selenium.webdriver.remote.webdriver + +__all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable") + +IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) + +logger = logging.getLogger("uc") + + +def find_chrome_executable(): + """ + returns the full path to the chrome _browser binary + may not work if chrome is in a custom folder. + + :return: path to chrome executable + :rtype: str + """ + candidates = set() + if IS_POSIX: + for item in os.environ.get("PATH").split(os.pathsep): + for subitem in ("google-chrome", "chromium", "chromium-browser"): + candidates.add(os.sep.join((item, subitem))) + if 'darwin' in sys.platform: + candidates.update(["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"]) + else: + for item in map( + os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA") + ): + for subitem in ( + "Google/Chrome/Application", + "Google/Chrome Beta/Application", + "Google/Chrome Canary/Application", + ): + candidates.add(os.sep.join((item, subitem, "chrome.exe"))) + for candidate in candidates: + if os.path.exists(candidate) and os.access(candidate, os.X_OK): + return os.path.normpath(candidate) + + +class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): + + __doc__ = """\ + -------------------------------------------------------------------------- + NOTE: + Chrome has everything included to work out of the box. + it does not `need` customizations. + any customizations MAY lead to trigger bot migitation systems. + + -------------------------------------------------------------------------- + """ + selenium.webdriver.remote.webdriver.WebDriver.__doc__ + + _instances = set() + + def __init__( + self, + executable_path="./chromedriver", + port=0, + options=None, + service_args=None, + desired_capabilities=None, + service_log_path=None, + chrome_options=None, + keep_alive=True, + debug_addr=None, + user_data_dir=None, + factor=0.5, + delay=1, + ): + + p = Patcher(target_path=executable_path) + p.auto(False) + self._patcher = p + self.factor = factor + self.delay = delay + self.port = port + self.process = None + self.browser_args = None + self._rcount = 0 + self._rdiff = 10 + + try: + dbg = debug_addr.split(":") + debug_host, debug_port = str(dbg[0]), int(dbg[1]) + except AttributeError: + debug_port = selenium.webdriver.common.service.utils.free_port() + debug_host = "127.0.0.1" + + if not debug_addr: + debug_addr = f"{debug_host}:{debug_port}" + + if not user_data_dir: + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + + if not options: + options = selenium.webdriver.chrome.webdriver.Options() + + if not options.debugger_address: + options.debugger_address = debug_addr + + if not options.binary_location: + options.binary_location = find_chrome_executable() + + if not desired_capabilities: + desired_capabilities = options.to_capabilities() + + self.options = options + self.user_data_dir = user_data_dir + + extra_args = [] + if options.headless: + extra_args.append("--headless") + + self.browser_args = [ + find_chrome_executable(), + "--user-data-dir=%s" % user_data_dir, + "--remote-debugging-host=%s" % debug_host, + "--remote-debugging-port=%s" % debug_port, + "--log-level=%d" % divmod(logging.getLogger().getEffectiveLevel(), 10)[0], + *extra_args, + ] + + self.browser = subprocess.Popen( + self.browser_args, + close_fds="win32" in sys.platform, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + selenium.webdriver.chrome.webdriver.WebDriver.__init__( + self, + executable_path=p.target_path, + port=port, + options=options, + service_args=service_args, + desired_capabilities=desired_capabilities, + service_log_path=service_log_path, + chrome_options=chrome_options, + keep_alive=keep_alive, + ) + + def start_session(self, capabilities=None, browser_profile=None): + if not capabilities: + capabilities = self.options.to_capabilities() + super().start_session(capabilities, browser_profile) + + def get_in(self, url: str, delay=2.5, factor=1): + """ + :param url: str + :param delay: int + :param factor: disconnect seconds after .get() + too low will disconnect before get() fired. + + ================================================= + + In case you are being detected by some sophisticated + algorithm, and you are the kind that hates losing, + this might be your friend. + + this currently works for hCaptcha based systems + (this includes CloudFlare!), and also passes many + custom setups (eg: ticketmaster.com), + + + Once you are past the first challenge, a cookie is saved + which (in my tests) also worked for other sites, and lasted + my entire session! However, to play safe, i'd recommend to just + call it once for every new site/domain you navigate to. + + NOTE: mileage may vary! + bad behaviour can still be detected, and this program does not + magically "fix" a flagged ip. + + please don't spam issues on github! first look if the issue + is not already reported. + """ + try: + self.get(url) + finally: + self.close() + # threading.Timer(factor or self.factor, self.close).start() + time.sleep(delay or self.delay) + self.start_session() + + def quit(self): + try: + self.browser.kill() + self.browser.wait(1) + except TimeoutError as e: + logger.debug(e, exc_info=True) + except Exception: # noqa + pass + try: + super().quit() + except Exception: # noqa + pass + try: + shutil.rmtree(self.user_data_dir, ignore_errors=False) + except PermissionError: + time.sleep(1) + self.quit() + + def __del__(self): + self.quit() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + threading.Timer(self.factor, self.start_session).start() + time.sleep(self.delay) + + def __hash__(self): + return hash(self.options.debugger_address) + + +class Patcher(object): + url_repo = "https://chromedriver.storage.googleapis.com" + + def __init__(self, target_path=None, force=False, version_main: int = 0): + + if not target_path: + target_path = os.path.join(tempfile.gettempdir(), 'undetected_chromedriver', 'chromedriver') + if not IS_POSIX: + if not target_path[-4:] == ".exe": + target_path += ".exe" + + self.force = force + z, e = self.get_package_name() + if not target_path: + target_path = e + + self.exename = e + self.target_path = target_path + self.zipname = z + self.version_main = version_main + self.version_full = None + + def auto(self, force=False): + try: + os.unlink(self.target_path) + except PermissionError: + + if force or self.force: + self.force_kill_instances() + return self.auto() + + if self.verify_patch(): + # assumes already running AND patched + return True + return False + except FileNotFoundError: + pass + + release = self.fetch_release_number() + self.version_main = release.version[0] + self.version_full = release + self.fetch_package() + self.unzip_package() + self.patch_exe() + return self.verify_patch() + + def fetch_release_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + :return: version string + :rtype: LooseVersion + """ + path = ( + "/" + + ( + "latest_release" + if not self.version_main + else f"latest_release_{self.version_main}" + ).upper() + ) + logger.debug("getting release number from %s" % path) + return LooseVersion(urlopen(self.url_repo + path).read().decode()) + + def parse_exe_version(self): + with io.open(self.target_path, "rb") as f: + for line in iter(lambda: f.readline(), b""): + match = re.search(br"platform_handle\x00content\x00([0-9\.]*)", line) + if match: + return LooseVersion(match[1].decode()) + + def fetch_package(self): + """ + Downloads ChromeDriver from source + + :return: path to downloaded file + """ + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zipname) + logger.debug("downloading from %s" % u) + zp, *_ = urlretrieve(u, filename=self.zipname) + return zp + + def unzip_package(self): + """ + Does what it says + + :return: path to unpacked executable + """ + logger.debug("unzipping %s" % self.zipname) + try: + os.makedirs(os.path.dirname(self.target_path), mode=0o755) + except OSError: + pass + with zipfile.ZipFile(self.zipname, mode='r') as zf: + zf.extract(self.exename) + os.rename(self.exename, self.target_path) + os.remove(self.zipname) + os.chmod(self.target_path, 0o755) + return self.target_path + + @staticmethod + def get_package_name(): + """ + returns a tuple of (zipname, exename) depending on platform. + + :return: (zipname, exename) + """ + zipname = "chromedriver_%s.zip" + exe = "chromedriver%s" + platform = sys.platform + if platform.endswith("win32"): + zipname %= "win32" + exe %= ".exe" + if platform.endswith("linux"): + zipname %= "linux64" + exe %= "" + if platform.endswith("darwin"): + zipname %= "mac64" + exe %= "" + return zipname, exe + + def force_kill_instances(self): + """ + kills running instances. + + :param self: + :return: True on success else False + """ + if IS_POSIX: + r = os.system("kill -f -9 $(pidof %s)" % self.exename) + else: + r = os.system("taskkill /f /im %s" % self.exename) + return not r + + @staticmethod + def gen_random_cdc(): + cdc = random.choices(string.ascii_lowercase, k=26) + cdc[-6:-4] = map(str.upper, cdc[-6:-4]) + cdc[2] = cdc[0] + cdc[3] = "_" + return "".join(cdc).encode() + + def verify_patch(self): + """simple check if executable is patched. + + :return: False if not patched, else True + """ + try: + with io.open(self.target_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + return True + except FileNotFoundError: + return False + + def patch_exe(self): + """ + Patches the ChromeDriver binary + + :return: False on failure, binary name on success + """ + + logger.info("patching driver executable %s" % self.target_path) + + linect = 0 + replacement = self.gen_random_cdc() + with io.open(self.target_path, "r+b") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + fh.seek(-len(line), 1) + newline = re.sub(b"cdc_.{22}", replacement, line) + fh.write(newline) + linect += 1 + return linect + + def __del__(self): + shutil.rmtree(os.path.dirname(self.target_path), ignore_errors=True) + + +class ChromeOptions(selenium.webdriver.chrome.webdriver.Options): + pass