diff --git a/README.md b/README.md index 4332f81..7c6132c 100644 --- a/README.md +++ b/README.md @@ -1,131 +1,138 @@ -# undetected_chromedriver # - -https://github.com/ultrafunkamsterdam/undetected-chromedriver - -Optimized Selenium Chromedriver patch which does not trigger anti-bot services like Distill Network / Imperva / DataDome / Botprotect.io -Automatically downloads the driver binary and patches it. - -* **Tested until current chrome beta versions** -* **Works also on Brave Browser and many other Chromium based browsers** -* **Python 3.6++** - -## Installation ## -``` -pip install undetected-chromedriver -``` - -## Usage ## - -To prevent unnecessary hair-pulling and issue-raising, please mind the **[important note at the end of this document](#important-note) .** - -
- -#### The Version 2 way #### -Literally, this is all you have to do. Settings are included and your browser executable found automagically. -```python -import undetected_chromedriver.v2 as uc -driver = uc.Chrome() -with driver: - driver.get('https://coinfaucet.eu') # known url using cloudflare's "under attack mode" -``` - - -#### the easy way (recommended) #### -```python -import undetected_chromedriver as uc -driver = uc.Chrome() -driver.get('https://distilnetworks.com') -``` - - -#### target specific chrome version #### -```python -import undetected_chromedriver as uc -uc.TARGET_VERSION = 85 -driver = uc.Chrome() -``` - - -#### monkeypatch mode #### -Needs to be done before importing from selenium package - -```python -import undetected_chromedriver as uc -uc.install() - -from selenium.webdriver import Chrome -driver = Chrome() -driver.get('https://distilnetworks.com') - -``` - - -#### the customized way #### -```python -import undetected_chromedriver as uc - -#specify chromedriver version to download and patch -uc.TARGET_VERSION = 78 - -# or specify your own chromedriver binary (why you would need this, i don't know) - -uc.install( - executable_path='c:/users/user1/chromedriver.exe', -) - -opts = uc.ChromeOptions() -opts.add_argument(f'--proxy-server=socks5://127.0.0.1:9050') -driver = uc.Chrome(options=opts) -driver.get('https://distilnetworks.com') -``` - - -#### datadome.co example #### -These guys have actually a powerful product, and a link to this repo, which makes me wanna test their product. -Make sure you use a "clean" ip for this one. -```python -# -# STANDARD selenium Chromedriver -# -from selenium import webdriver -chrome = webdriver.Chrome() -chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') -chrome.save_screenshot('datadome_regular_webdriver.png') -True # it caused my ip to be flagged, unfortunately - - -# -# UNDETECTED chromedriver (headless,even) -# -import undetected_chromedriver as uc -options = uc.ChromeOptions() -options.headless=True -options.add_argument('--headless') -chrome = uc.Chrome(options=options) -chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') -chrome.save_screenshot('datadome_undetected_webddriver.png') - -``` -**Check both saved screenhots [here](https://imgur.com/a/fEmqadP)** - - - -## important note ## - -Due to the inner workings of the module, it is needed to browse programmatically (ie: using .get(url) ). Never use the gui to navigate. Using your keybord and mouse for navigation causes possible detection! New Tabs: same story. If you really need multi-tabs, then open the tab with the blank page (hint: url is `data:,` including comma, and yes, driver accepts it) and do your thing as usual. If you follow these "rules" (actually its default behaviour), then you will have a great time for now. - -TL;DR and for the visual-minded: - -```python -In [1]: import undetected_chromedriver as uc -In [2]: driver = uc.Chrome() -In [3]: driver.execute_script('return navigator.webdriver') -Out[3]: True # Detectable -In [4]: driver.get('https://distilnetworks.com') # starts magic -In [4]: driver.execute_script('return navigator.webdriver') -In [5]: None # Undetectable! -``` -## end important note ## - - - +# undetected_chromedriver # + +https://github.com/ultrafunkamsterdam/undetected-chromedriver + +Optimized Selenium Chromedriver patch which does not trigger anti-bot services like Distill Network / Imperva / DataDome / Botprotect.io +Automatically downloads the driver binary and patches it. + +* **Tested until current chrome beta versions** +* **Works also on Brave Browser and many other Chromium based browsers** +* **Python 3.6++** + +## Installation ## +``` +pip install undetected-chromedriver +``` + +## Usage ## + +To prevent unnecessary hair-pulling and issue-raising, please mind the **[important note at the end of this document](#important-note) .** + +
+ +#### The Version 2 way #### +Literally, this is all you have to do. Settings are included and your browser executable found automagically. + +```python +import undetected_chromedriver.v2 as uc +driver = uc.Chrome() +with driver: + driver.get('https://coinfaucet.eu') # known url using cloudflare's "under attack mode" +``` + + + + +
+
+ +#### the easy way (v1 old stuff) #### +```python +import undetected_chromedriver as uc +driver = uc.Chrome() +driver.get('https://distilnetworks.com') +``` + + + +#### target specific chrome version (v1 old stuff) #### +```python +import undetected_chromedriver as uc +uc.TARGET_VERSION = 85 +driver = uc.Chrome() +``` + + +#### monkeypatch mode (v1 old stuff) #### +Needs to be done before importing from selenium package + +```python +import undetected_chromedriver as uc +uc.install() + +from selenium.webdriver import Chrome +driver = Chrome() +driver.get('https://distilnetworks.com') + +``` + + +#### the customized way (v1 old stuff) #### +```python +import undetected_chromedriver as uc + +#specify chromedriver version to download and patch +uc.TARGET_VERSION = 78 + +# or specify your own chromedriver binary (why you would need this, i don't know) + +uc.install( + executable_path='c:/users/user1/chromedriver.exe', +) + +opts = uc.ChromeOptions() +opts.add_argument(f'--proxy-server=socks5://127.0.0.1:9050') +driver = uc.Chrome(options=opts) +driver.get('https://distilnetworks.com') +``` + + +#### datadome.co example (v1 old stuff) #### +These guys have actually a powerful product, and a link to this repo, which makes me wanna test their product. +Make sure you use a "clean" ip for this one. +```python +# +# STANDARD selenium Chromedriver +# +from selenium import webdriver +chrome = webdriver.Chrome() +chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') +chrome.save_screenshot('datadome_regular_webdriver.png') +True # it caused my ip to be flagged, unfortunately + + +# +# UNDETECTED chromedriver (headless,even) +# +import undetected_chromedriver as uc +options = uc.ChromeOptions() +options.headless=True +options.add_argument('--headless') +chrome = uc.Chrome(options=options) +chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') +chrome.save_screenshot('datadome_undetected_webddriver.png') + +``` +**Check both saved screenhots [here](https://imgur.com/a/fEmqadP)** + + + +## important note (v1 old stuff) #### + +Due to the inner workings of the module, it is needed to browse programmatically (ie: using .get(url) ). Never use the gui to navigate. Using your keybord and mouse for navigation causes possible detection! New Tabs: same story. If you really need multi-tabs, then open the tab with the blank page (hint: url is `data:,` including comma, and yes, driver accepts it) and do your thing as usual. If you follow these "rules" (actually its default behaviour), then you will have a great time for now. + +TL;DR and for the visual-minded: + +```python +In [1]: import undetected_chromedriver as uc +In [2]: driver = uc.Chrome() +In [3]: driver.execute_script('return navigator.webdriver') +Out[3]: True # Detectable +In [4]: driver.get('https://distilnetworks.com') # starts magic +In [4]: driver.execute_script('return navigator.webdriver') +In [5]: None # Undetectable! +``` +## end important note ## + + + diff --git a/setup.py b/setup.py index 6ce07e4..193158f 100644 --- a/setup.py +++ b/setup.py @@ -12,29 +12,49 @@ Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y BY ULTRAFUNKAMSTERDAM (https://github.com/ultrafunkamsterdam)""" from setuptools import setup +import os +import re +import codecs + + +dirname = os.path.abspath(os.path.dirname(__file__)) + +with codecs.open( + os.path.join(dirname, "undetected_chromedriver", "__init__.py"), + mode="r", + encoding="latin1", +) as fp: + try: + version = re.findall(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M)[0] + except Exception: + raise RuntimeError("unable to determine version") setup( name="undetected-chromedriver", - version="2.2.1", + version=version, packages=["undetected_chromedriver"], - install_requires=["selenium",], + install_requires=[ + "selenium", + ], url="https://github.com/ultrafunkamsterdam/undetected-chromedriver", license="GPL-3.0", author="UltrafunkAmsterdam", author_email="info@blackhat-security.nl", description="""\ - selenium.webdriver.Chrome replacement with focus on stealth. - not triggered by Distil / CloudFlare / Imperva / DataDome / hCaptcha and such. + selenium.webdriver.Chrome replacement wiht compatiblity for Brave, and other Chromium baed browsers. + not triggered by CloudFlare/Imperva/hCaptcha and such. NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms. """, - long_description=open("README.md").read(), + long_description=open(os.path.join(dirname, "README.md")).read(), long_description_content_type="text/markdown", classifiers=[ "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", ], ) - diff --git a/tests/test_undetected_chromedriver.py b/tests/test_undetected_chromedriver.py deleted file mode 100644 index c78f97b..0000000 --- a/tests/test_undetected_chromedriver.py +++ /dev/null @@ -1,36 +0,0 @@ -import sys -import os - - -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -import time # noqa - - -def test_undetected_chromedriver(): - - import undetected_chromedriver.v2 as uc - driver = uc.Chrome() - - with driver: - driver.get("https://coinfaucet.eu") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("coinfaucet.eu.png") - - with driver: - driver.get("https://cia.gov") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("cia.gov.png") - - with driver: - driver.get("https://lhcdn.botprotect.io") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("notprotect.io.png") - - with driver: - driver.get("https://www.datadome.co") - time.sleep(4) # sleep only used for timing of screenshot - driver.save_screenshot("datadome.co.png") - - -test_undetected_chromedriver() - diff --git a/undetected_chromedriver/__init__.py b/undetected_chromedriver/__init__.py index 362a143..20cb328 100644 --- a/undetected_chromedriver/__init__.py +++ b/undetected_chromedriver/__init__.py @@ -31,7 +31,7 @@ from selenium.webdriver import Chrome as _Chrome from selenium.webdriver import ChromeOptions as _ChromeOptions logger = logging.getLogger(__name__) - +__version__ = "2.2.7" TARGET_VERSION = 0 diff --git a/undetected_chromedriver/tests/test_undetected_chromedriver.py b/undetected_chromedriver/tests/test_undetected_chromedriver.py new file mode 100644 index 0000000..1669644 --- /dev/null +++ b/undetected_chromedriver/tests/test_undetected_chromedriver.py @@ -0,0 +1,66 @@ +import logging +import os +import sys +import time # noqa + +from ..v2 import * + +logging.basicConfig(level=10) + +logger = logging.getLogger("TEST") +logger.setLevel(20) + + +JS_SERIALIZE_FUNCTION = """ +decycle=function(n,e){"use strict";var t=new WeakMap;return function n(o,r){var c,i;return void 0!==e&&(o=e(o)),"object"!=typeof o||null===o||o instanceof Boolean||o instanceof Date||o instanceof Number||o instanceof RegExp||o instanceof String?o:void 0!==(c=t.get(o))?{$ref:c}:(t.set(o,r),Array.isArray(o)?(i=[],o.forEach(function(e,t){i[t]=n(e,r+"["+t+"]")})):(i={},Object.keys(o).forEach(function(e){i[e]=n(o[e],r+"["+JSON.stringify(e)+"]")})),i)}(n,"$")}; +function replacer(t){try{if(Array.prototype.splice.call(t).length<100){let e={};for(let r in t)e[r]=t[r];return e}}catch(t){}} +return decycle(window) +""" + + +def test_quick(): + import undetected_chromedriver.v2 as uc + + print("uc module: ", uc) + # options = selenium.webdriver.ChromeOptions() + options = uc.ChromeOptions() + + options.add_argument("--user-data-dir=c:\\temp") + options.binary_location = uc.find_chrome_executable() + driver = uc.Chrome( + executable_path="./chromedriver.exe", + options=options, + service_log_path="c:\\temp\\service.log.txt", + ) + while True: + sys.stdin.read() + + +def test_undetected_chromedriver(): + import undetected_chromedriver.v2 as uc + + driver = uc.Chrome() + + with driver: + driver.get("https://coinfaucet.eu") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("coinfaucet.eu.png") + + with driver: + driver.get("https://cia.gov") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("cia.gov.png") + + with driver: + driver.get("https://lhcdn.botprotect.io") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("notprotect.io.png") + + with driver: + driver.get("https://www.datadome.co") + time.sleep(4) # sleep only used for timing of screenshot + driver.save_screenshot("datadome.co.png") + + +# test_quick() +# #test_undetected_chromedriver() diff --git a/undetected_chromedriver/tests/v2/test_uc.py b/undetected_chromedriver/tests/v2/test_uc.py new file mode 100644 index 0000000..f42bf7c --- /dev/null +++ b/undetected_chromedriver/tests/v2/test_uc.py @@ -0,0 +1,36 @@ +import pytest +from _pytest.fixtures import FixtureRequest +import undetected_chromedriver.v2 as uc + +FAILED_SCREENSHOT_NAME = "failed.png" + + +@pytest.fixture +def head_uc(request: FixtureRequest): + request.instance.driver = uc.Chrome() + + def teardown(): + request.instance.driver.save_screenshot(FAILED_SCREENSHOT_NAME) + request.instance.driver.quit() + + request.addfinalizer(teardown) + + return request.instance.driver + + +@pytest.fixture +def headless_uc(request: FixtureRequest): + options = uc.ChromeOptions() + options.headless = True + request.instance.driver = uc.Chrome(options=options) + + def teardown(): + request.instance.driver.sapipve_screenshot(FAILED_SCREENSHOT_NAME) + request.instance.driver.quit() + + request.addfinalizer(teardown) + + return request.instance.driver + + +pytest.main() diff --git a/undetected_chromedriver/v2.py b/undetected_chromedriver/v2.py index 552fa19..061bef1 100644 --- a/undetected_chromedriver/v2.py +++ b/undetected_chromedriver/v2.py @@ -31,7 +31,6 @@ whats new: """ - from __future__ import annotations import io @@ -44,11 +43,8 @@ import string import subprocess import sys import tempfile -import threading import time import zipfile -import atexit -import contextlib from distutils.version import LooseVersion from urllib.request import urlopen, urlretrieve @@ -56,21 +52,25 @@ import selenium.webdriver.chrome.service import selenium.webdriver.chrome.webdriver import selenium.webdriver.common.service import selenium.webdriver.remote.webdriver +from selenium.webdriver.chrome.options import Options as _ChromeOptions __all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable") IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) logger = logging.getLogger("uc") +logger.setLevel(logging.getLogger().getEffectiveLevel()) def find_chrome_executable(): """ - returns the full path to the chrome _browser binary - may not work if chrome is in a custom folder. + Finds the chrome, chrome beta, chrome canary, chromium executable + + Returns + ------- + executable_path : str + the full file path to found executable - :return: path to chrome executable - :rtype: str """ candidates = set() if IS_POSIX: @@ -97,114 +97,232 @@ def find_chrome_executable(): class Chrome(object): + """ + Controls the ChromeDriver and allows you to drive the browser. + + The webdriver file will be downloaded by this module automatically, + you do not need to specify this. however, you may if you wish. + + + Attributes + ---------- + + + Methods + ------- + + reconnect() + + this can be useful in case of heavy detection methods + -stops the chromedriver service which runs in the background + -starts the chromedriver service which runs in the background + -recreate session + + + start_session(capabilities=None, browser_profile=None) + + differentiates from the regular method in that it does not + require a capabilities argument. The capabilities are automatically + recreated from the options at creation time. + - __doc__ = ( - """\ -------------------------------------------------------------------------- - NOTE: - Chrome has everything included to work out of the box. - it does not `need` customizations. - any customizations MAY lead to trigger bot migitation systems. - + NOTE: + Chrome has everything included to work out of the box. + it does not `need` customizations. + any customizations MAY lead to trigger bot migitation systems. + -------------------------------------------------------------------------- """ - + selenium.webdriver.remote.webdriver.WebDriver.__doc__ - ) _instances = set() def __init__( self, - executable_path="./chromedriver", + executable_path=None, port=0, options=None, service_args=None, desired_capabilities=None, service_log_path=None, - chrome_options=None, keep_alive=True, - debug_addr=None, - user_data_dir=None, - factor=1, - delay=2, + log_level=0, + headless=False, emulate_touch=False, + delay=5, ): + """ + Creates a new instance of the chrome driver. - p = Patcher(target_path=executable_path) - p.auto(False) + Starts the service and then creates new instance of chrome driver. - self._patcher = p - self.factor = factor - self.delay = delay - self.port = port - self.process = None - self.browser_args = None - self._rcount = 0 - self._rdiff = 10 - try: - dbg = debug_addr.split(":") - debug_host, debug_port = str(dbg[0]), int(dbg[1]) - except AttributeError: - debug_port = selenium.webdriver.common.service.utils.free_port() - debug_host = "127.0.0.1" + Parameters + ---------- + executable_path: str, optional, default: None - use find_chrome_executable + Path to the executable. If the default is used it assumes the executable is in the $PATH - if not debug_addr: - debug_addr = f"{debug_host}:{debug_port}" + port: int, optional, default: 0 + port you would like the service to run, if left as 0, a free port will be found. - if not user_data_dir: - user_data_dir = os.path.normpath(tempfile.mkdtemp()) + options: ChromeOptions, optional, default: None - automatic useful defaults + this takes an instance of ChromeOptions, mainly to customize browser behavior. + anything other dan the default, for example extensions or startup options + are not supported in case of failure, and can probably lowers your undetectability. + + service_args: list of str, optional, default: None + arguments to pass to the driver service + + desired_capabilities: dict, optional, default: None - auto from config + Dictionary object with non-browser specific capabilities only, such as "proxy" or "loggingPref". + + service_log_path: str, optional, default: None + path to log information from the driver. + + keep_alive: bool, optional, default: True + Whether to configure ChromeRemoteConnection to use HTTP keep-alive. + + log_level: int, optional, default: adapts to python global log level + + headless: bool, optional, default: False + can also be specified in the options instance. + Specify whether you want to use the browser in headless mode. + warning: this lowers undetectability and not fully supported. + + emulate_touch: bool, optional, default: False + if set to True, patches window.maxTouchPoints to always return non-zero + + delay: int, optional, default: 5 + delay in seconds to wait before giving back control. + this is used only when using the context manager + (`with` statement) to bypass, for example CloudFlare. + 5 seconds is a foolproof value. + + """ + + patcher = Patcher(executable_path=executable_path) + patcher.auto() if not options: options = selenium.webdriver.chrome.webdriver.Options() + try: + if options.session and options.session is not None: + # prevent reuse of options, + # as it just appends arguments, not replace them + # you'll get conflicts starting chrome + raise RuntimeError("you cannot reuse the ChromeOptions object") + except AttributeError: + pass + options.session = self + + debug_port = selenium.webdriver.common.service.utils.free_port() + debug_host = "127.0.0.1" if not options.debugger_address: - options.debugger_address = debug_addr + options.debugger_address = "%s:%d" % (debug_host, debug_port) + + options.add_argument("--remote-debugging-host=%s " % debug_host) + options.add_argument("--remote-debugging-port=%s" % debug_port) + + # see if a custom user profile is specified + for arg in options.arguments: + if "user-data-dir" in arg: + m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) + try: + user_data_dir = m[1] + logger.debug( + "user-data-dir found in user argument %s => %s" % (arg, m[1]) + ) + keep_user_data_dir = True + break + except IndexError: + logger.debug( + "no user data dir could be extracted from supplied argument %s " + % arg + ) + else: + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + keep_user_data_dir = False + arg = "--user-data-dir=%s" % user_data_dir + options.add_argument(arg) + logger.debug( + "created a temporary folder in which the user-data (profile) will be stored during this\n" + "session, and added it to chrome startup arguments: %s" % arg + ) if not options.binary_location: options.binary_location = find_chrome_executable() + self._delay = delay + + self.user_data_dir = user_data_dir + self.keep_user_data_dir = keep_user_data_dir + + if headless or options.headless: + options.headless = True + options.add_argument("--window-size=1920,1080") + options.add_argument("--start-maximized") + + options.add_argument( + "--log-level=%d" % log_level + or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] + ) + + # fix exit_type flag to prevent tab-restore nag + try: + with open( + os.path.join(user_data_dir, "Default/Preferences"), + encoding="latin1", + mode="r+", + ) as fs: + import json + + config = json.load(fs) + if config["profile"]["exit_type"] is not None: + # fixing the restore-tabs-nag + config["profile"]["exit_type"] = None + fs.seek(0, 0) + fs.write(json.dumps(config, indent=4)) + logger.debug("fixed exit_type flag") + except Exception as e: + logger.debug("did not find a bad exit_type flag ") + + self.options = options + if not desired_capabilities: desired_capabilities = options.to_capabilities() - self.options = options - self.user_data_dir = user_data_dir - - extra_args = options.arguments - - if options.headless: - extra_args.append("--headless") - extra_args.append("--window-size=1920,1080") - - self.browser_args = [ - find_chrome_executable(), - "--user-data-dir=%s" % user_data_dir, - "--remote-debugging-host=%s" % debug_host, - "--remote-debugging-port=%s" % debug_port, - "--log-level=%d" % divmod(logging.getLogger().getEffectiveLevel(), 10)[0], - *extra_args, - ] + # unlock_port(debug_port) self.browser = subprocess.Popen( - self.browser_args, - # close_fds="win32" in sys.platform, + [options.binary_location, *options.arguments], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver( - executable_path=p.target_path, + executable_path=patcher.executable_path, port=port, options=options, service_args=service_args, desired_capabilities=desired_capabilities, service_log_path=service_log_path, - chrome_options=chrome_options, keep_alive=keep_alive, ) + self.__class__._instances.add((self, options)) if options.headless: + if emulate_touch: + self.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + Object.defineProperty(navigator, 'maxTouchPoints', { + get: () => 1 + })""" + }, + ) orig_get = self.webdriver.get @@ -228,29 +346,29 @@ class Chrome(object): : target[key] }) }); - - Object.defineProperty(Notification, "permission", { - configurable: true, - enumerable: true, - get: () => { - return "unknown" - }, - }); """ }, ) - logger.info("removing headless from user-agent string") + logger.info("removing headless from user-agent string") - self.execute_cdp_cmd( - "Network.setUserAgentOverride", - { - "userAgent": self.execute_script( - "return navigator.userAgent" - ).replace("Headless", "") - }, - ) - logger.info("fixing notifications permission in headless browsers") + self.execute_cdp_cmd( + "Network.setUserAgentOverride", + { + "userAgent": self.execute_script( + "return navigator.userAgent" + ).replace("Headless", "") + }, + ) + self.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + // fix Notification permission in headless mode + Object.defineProperty(Notification, 'permission', { get: () => "default"}); + """ + }, + ) if emulate_touch: self.execute_cdp_cmd( @@ -278,50 +396,27 @@ class Chrome(object): def __dir__(self): return object.__dir__(self) + object.__dir__(self.webdriver) + def reconnect(self): + try: + self.service.stop() + except Exception as e: + logger.debug(e) + + try: + self.service.start() + except Exception as e: + logger.debug(e) + + try: + self.start_session() + except Exception as e: + logger.debug(e) + def start_session(self, capabilities=None, browser_profile=None): if not capabilities: capabilities = self.options.to_capabilities() self.webdriver.start_session(capabilities, browser_profile) - def get_in(self, url: str, delay=2, factor=1): - """ - :param url: str - :param delay: int - :param factor: disconnect seconds after .get() - too low will disconnect before get() fired. - - ================================================= - - In case you are being detected by some sophisticated - algorithm, and you are the kind that hates losing, - this might be your friend. - - this currently works for hCaptcha based systems - (this includes CloudFlare!), and also passes many - custom setups (eg: ticketmaster.com), - - - Once you are past the first challenge, a cookie is saved - which (in my tests) also worked for other sites, and lasted - my entire session! However, to play safe, i'd recommend to just - call it once for every new site/domain you navigate to. - - NOTE: mileage may vary! - bad behaviour can still be detected, and this program does not - magically "fix" a flagged ip. - - please don't spam issues on github! first look if the issue - is not already reported. - """ - try: - self.get(url) - finally: - self.service.stop() - # threading.Timer(factor or self.factor, self.close).start() - time.sleep(delay or self.delay) - self.service.start() - self.start_session() - def quit(self): logger.debug("closing webdriver") try: @@ -336,13 +431,20 @@ class Chrome(object): logger.debug(e, exc_info=True) except Exception: # noqa pass - try: - logger.debug("removing profile : %s" % self.user_data_dir) - shutil.rmtree(self.user_data_dir, ignore_errors=False) - except PermissionError: - logger.debug("permission error. files are still in use/locked. retying...") - time.sleep(1) - self.quit() + if not self.keep_user_data_dir or self.keep_user_data_dir is False: + for _ in range(3): + try: + logger.debug("removing profile : %s" % self.user_data_dir) + shutil.rmtree(self.user_data_dir, ignore_errors=False) + except FileNotFoundError: + pass + except PermissionError: + logger.debug( + "permission error. files are still in use/locked. retying..." + ) + else: + break + time.sleep(1) def __del__(self): self.quit() @@ -352,59 +454,121 @@ class Chrome(object): def __exit__(self, exc_type, exc_val, exc_tb): self.service.stop() - # threading.Timer(self.factor, self.service.start).start() - time.sleep(self.delay) + time.sleep(self._delay) self.service.start() self.start_session() def __hash__(self): return hash(self.options.debugger_address) + def find_elements_by_text(self, text: str): + for elem in self.find_elements_by_css_selector("*"): + try: + if text.lower() in elem.text.lower(): + yield elem + except Exception as e: + logger.debug("find_elements_by_text: %s" % e) + + def find_element_by_text(self, text: str): + for elem in self.find_elements_by_css_selector("*"): + try: + if text.lower() in elem.text.lower(): + return elem + except Exception as e: + logger.debug("find_elements_by_text: %s" % e) + class Patcher(object): url_repo = "https://chromedriver.storage.googleapis.com" + zip_name = "chromedriver_%s.zip" + exe_name = "chromedriver%s" - def __init__( - self, target_path="./chromedriver", force=False, version_main: int = 0 - ): - if not IS_POSIX: - if not target_path[-4:] == ".exe": - target_path += ".exe" + platform = sys.platform + if platform.endswith("win32"): + zip_name %= "win32" + exe_name %= ".exe" + if platform.endswith("linux"): + zip_name %= "linux64" + exe_name %= "" + if platform.endswith("darwin"): + zip_name %= "mac64" + exe_name %= "" + + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif platform.startswith("linux"): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + def __init__(self, executable_path=None, force=False, version_main: int = 0): + """ + + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ self.force = force - z, e = self.get_package_name() - if not target_path: - target_path = e - self.exename = e - self.target_path = target_path - self.zipname = z + if not executable_path: + executable_path = os.path.join(self.data_path, self.exe_name) + + if not IS_POSIX: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join(self.data_path, self.zip_name) + + self.executable_path = os.path.abspath(os.path.join(".", executable_path)) + self.version_main = version_main self.version_full = None - def auto(self, force=False): + @classmethod + def auto(cls, executable_path=None, force=False): + """ + + Args: + force: + + Returns: + + """ + i = cls(executable_path, force=force) try: - os.unlink(self.target_path) + os.unlink(i.executable_path) except PermissionError: - - if force or self.force: - self.force_kill_instances() - return self.auto() - - if self.verify_patch(): - # assumes already running AND patched - return True - return False + if i.force: + cls.force_kill_instances(i.executable_path) + return i.auto(force=False) + try: + if i.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False except FileNotFoundError: pass - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.fetch_package() - self.unzip_package() + release = i.fetch_release_number() + i.version_main = release.version[0] + i.version_full = release + i.unzip_package(i.fetch_package()) + i.patch() + return i + + def patch(self): self.patch_exe() - return self.verify_patch() + return self.is_binary_patched() def fetch_release_number(self): """ @@ -420,9 +584,9 @@ class Patcher(object): return LooseVersion(urlopen(self.url_repo + path).read().decode()) def parse_exe_version(self): - with io.open(self.target_path, "rb") as f: + with io.open(self.executable_path, "rb") as f: for line in iter(lambda: f.readline(), b""): - match = re.search(br"platform_handle\x00content\x00([0-9\.]*)", line) + match = re.search(br"platform_handle\x00content\x00([0-9.]*)", line) if match: return LooseVersion(match[1].decode()) @@ -432,61 +596,44 @@ class Patcher(object): :return: path to downloaded file """ - u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zipname) + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) logger.debug("downloading from %s" % u) - zp, *_ = urlretrieve(u, filename=self.zipname) - return zp + # return urlretrieve(u, filename=self.data_path)[0] + return urlretrieve(u)[0] - def unzip_package(self): + def unzip_package(self, fp): """ Does what it says :return: path to unpacked executable """ - logger.debug("unzipping %s" % self.zipname) + logger.debug("unzipping %s" % fp) try: - os.makedirs(os.path.dirname(self.target_path), mode=0o755) - except OSError: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): pass - with zipfile.ZipFile(self.zipname, mode="r") as zf: - zf.extract(self.exename) - os.rename(self.exename, self.target_path) - os.remove(self.zipname) - os.chmod(self.target_path, 0o755) - return self.target_path + + os.makedirs(self.data_path, mode=0o755, exist_ok=True) + + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extract(self.exe_name, os.path.dirname(self.executable_path)) + os.remove(fp) + os.chmod(self.executable_path, 0o755) + return self.executable_path @staticmethod - def get_package_name(): - """ - returns a tuple of (zipname, exename) depending on platform. - - :return: (zipname, exename) - """ - zipname = "chromedriver_%s.zip" - exe = "chromedriver%s" - platform = sys.platform - if platform.endswith("win32"): - zipname %= "win32" - exe %= ".exe" - if platform.endswith("linux"): - zipname %= "linux64" - exe %= "" - if platform.endswith("darwin"): - zipname %= "mac64" - exe %= "" - return zipname, exe - - def force_kill_instances(self): + def force_kill_instances(exe_name): """ kills running instances. + :param: executable name to kill, may be a path as well - :param self: :return: True on success else False """ + exe_name = os.path.basename(exe_name) if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % self.exename) + r = os.system("kill -f -9 $(pidof %s)" % exe_name) else: - r = os.system("taskkill /f /im %s" % self.exename) + r = os.system("taskkill /f /im %s" % exe_name) return not r @staticmethod @@ -497,19 +644,18 @@ class Patcher(object): cdc[3] = "_" return "".join(cdc).encode() - def verify_patch(self): + def is_binary_patched(self, executable_path=None): """simple check if executable is patched. :return: False if not patched, else True """ - try: - with io.open(self.target_path, "rb") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - return False - return True - except FileNotFoundError: - return False + executable_path = executable_path or self.executable_path + with io.open(executable_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + else: + return True def patch_exe(self): """ @@ -517,12 +663,11 @@ class Patcher(object): :return: False on failure, binary name on success """ - - logger.info("patching driver executable %s" % self.target_path) + logger.info("patching driver executable %s" % self.executable_path) linect = 0 replacement = self.gen_random_cdc() - with io.open(self.target_path, "r+b") as fh: + with io.open(self.executable_path, "r+b") as fh: for line in iter(lambda: fh.readline(), b""): if b"cdc_" in line: fh.seek(-len(line), 1) @@ -531,6 +676,46 @@ class Patcher(object): linect += 1 return linect + def __repr__(self): + return "{0:s}({1:s})".format( + self.__class__.__name__, + self.executable_path, + ) -class ChromeOptions(selenium.webdriver.chrome.webdriver.Options): - pass + +# +# +# def unlock_port(port): +# import os +# if not IS_POSIX: +# try: +# +# c = subprocess.Popen('netstat -ano | findstr :%d' % port, shell=True, stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# stdout, stderr = c.communicate() +# lines = stdout.splitlines() +# _pid = lines[0].split(b' ')[-1].decode() +# c = subprocess.Popen(['taskkill', '/f', '/pid', _pid], shell=True, stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# stdout, stderr = c.communicate() +# except Exception as e: +# logger.debug(e) +# +# else: +# try: +# os.system('kill -15 $(lsof -i:%d)' % port) +# except Exception: +# pass +# + + +class ChromeOptions(_ChromeOptions): + + session = None + + def add_extension_file_crx(self, extension=None): + if extension: + extension_to_add = os.path.abspath(os.path.expanduser(extension)) + logger.debug("extension_to_add: %s" % extension_to_add) + + return super().add_extension(r"%s" % extension)