2.2.6 - hodl your breath (#161)

* 2.2.2

* fixed a number of bugs
- specifying custom profile
- specifying custom binary path
- downloading, patching and storing now (if not explicity specified)
    happens in a writable folder, instead of the current working dir.

Committer: UltrafunkAmsterdam <UltrafunkAmsterdam@github>

* tidy up

* uncomment block

* - support for specifying and reusing the user profile folder.
    if a user-data-dir is specified, that folder will NOT be
    deleted on exit.
    example:
        options.add_argument('--user-data-dir=c:\\temp')

- uses a platform specific app data folder to store driver instead
    of the current workdir.

- impoved headless mode. fixed detection by notification perms.

- eliminates the "restore tabs" notification at startup

- added methods find_elements_by_text and find_element_by_text

- updated docs (partly)

-known issues:
    - extensions not running. this is due to the inner workings
        of chromedriver. still working on this.
    - driver window is not always closing along with a program exit.
    - MacOS: startup nag notifications. might be solved by
        re(using) a profile directory.

- known stuff:
    - some specific use cases, network conditions or behaviour
      can cause being detected.

* Squashed commit of the following:

commit 7ce8e7a236cbee770cb117145d4bf6dc245b936a
Author: ultrafunkamsterdam <info@blackhat-security.nl>
Date:   Fri Apr 30 18:22:39 2021 +0200

    readme change

commit f214dcf33f26f8b35616d7b61cf6dee656596c3f
Author: ultrafunkamsterdam <info@blackhat-security.nl>
Date:   Fri Apr 30 18:18:09 2021 +0200

    - make sure options cannot be reused as it will
        cause double and conflicting arguments to chrome

commit cf059a638c
Author: ultrafunkamsterdam <info@blackhat-security.nl>
Date:   Thu Apr 29 12:54:49 2021 +0200

    - support for specifying and reusing the user profile folder.
        if a user-data-dir is specified, that folder will NOT be
        deleted on exit.
        example:
            options.add_argument('--user-data-dir=c:\\temp')

    - uses a platform specific app data folder to store driver instead
        of the current workdir.

    - impoved headless mode. fixed detection by notification perms.

    - eliminates the "restore tabs" notification at startup

    - added methods find_elements_by_text and find_element_by_text

    - updated docs (partly)

    -known issues:
        - extensions not running. this is due to the inner workings
            of chromedriver. still working on this.
        - driver window is not always closing along with a program exit.
        - MacOS: startup nag notifications. might be solved by
            re(using) a profile directory.

    - known stuff:
        - some specific use cases, network conditions or behaviour
          can cause being detected.

commit b40d23c649
Author: ultrafunkamsterdam <info@blackhat-security.nl>
Date:   Tue Apr 27 20:41:18 2021 +0200

    uncomment block

commit d99809c8c6
Author: ultrafunkamsterdam <info@blackhat-security.nl>
Date:   Tue Apr 27 20:19:51 2021 +0200

    tidy up

* .

* 2.2.7

Co-authored-by: ultrafunkamsterdam <info@blackhat-security.nl>
This commit is contained in:
Leon 2021-05-01 22:49:59 +02:00 committed by GitHub
parent 996ed01403
commit bc30d7623f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 665 additions and 387 deletions

269
README.md
View File

@ -1,131 +1,138 @@
# undetected_chromedriver # # undetected_chromedriver #
https://github.com/ultrafunkamsterdam/undetected-chromedriver https://github.com/ultrafunkamsterdam/undetected-chromedriver
Optimized Selenium Chromedriver patch which does not trigger anti-bot services like Distill Network / Imperva / DataDome / Botprotect.io Optimized Selenium Chromedriver patch which does not trigger anti-bot services like Distill Network / Imperva / DataDome / Botprotect.io
Automatically downloads the driver binary and patches it. Automatically downloads the driver binary and patches it.
* **Tested until current chrome beta versions** * **Tested until current chrome beta versions**
* **Works also on Brave Browser and many other Chromium based browsers** * **Works also on Brave Browser and many other Chromium based browsers**
* **Python 3.6++** * **Python 3.6++**
## Installation ## ## Installation ##
``` ```
pip install undetected-chromedriver pip install undetected-chromedriver
``` ```
## Usage ## ## Usage ##
To prevent unnecessary hair-pulling and issue-raising, please mind the **[important note at the end of this document](#important-note) .** To prevent unnecessary hair-pulling and issue-raising, please mind the **[important note at the end of this document](#important-note) .**
<br> <br>
#### The Version 2 way #### #### The Version 2 way ####
Literally, this is all you have to do. Settings are included and your browser executable found automagically. Literally, this is all you have to do. Settings are included and your browser executable found automagically.
```python
import undetected_chromedriver.v2 as uc ```python
driver = uc.Chrome() import undetected_chromedriver.v2 as uc
with driver: driver = uc.Chrome()
driver.get('https://coinfaucet.eu') # known url using cloudflare's "under attack mode" with driver:
``` driver.get('https://coinfaucet.eu') # known url using cloudflare's "under attack mode"
```
#### the easy way (recommended) ####
```python
import undetected_chromedriver as uc
driver = uc.Chrome() <br>
driver.get('https://distilnetworks.com') <br>
```
#### the easy way (v1 old stuff) ####
```python
#### target specific chrome version #### import undetected_chromedriver as uc
```python driver = uc.Chrome()
import undetected_chromedriver as uc driver.get('https://distilnetworks.com')
uc.TARGET_VERSION = 85 ```
driver = uc.Chrome()
```
#### target specific chrome version (v1 old stuff) ####
#### monkeypatch mode #### ```python
Needs to be done before importing from selenium package import undetected_chromedriver as uc
uc.TARGET_VERSION = 85
```python driver = uc.Chrome()
import undetected_chromedriver as uc ```
uc.install()
from selenium.webdriver import Chrome #### monkeypatch mode (v1 old stuff) ####
driver = Chrome() Needs to be done before importing from selenium package
driver.get('https://distilnetworks.com')
```python
``` import undetected_chromedriver as uc
uc.install()
#### the customized way #### from selenium.webdriver import Chrome
```python driver = Chrome()
import undetected_chromedriver as uc driver.get('https://distilnetworks.com')
#specify chromedriver version to download and patch ```
uc.TARGET_VERSION = 78
# or specify your own chromedriver binary (why you would need this, i don't know) #### the customized way (v1 old stuff) ####
```python
uc.install( import undetected_chromedriver as uc
executable_path='c:/users/user1/chromedriver.exe',
) #specify chromedriver version to download and patch
uc.TARGET_VERSION = 78
opts = uc.ChromeOptions()
opts.add_argument(f'--proxy-server=socks5://127.0.0.1:9050') # or specify your own chromedriver binary (why you would need this, i don't know)
driver = uc.Chrome(options=opts)
driver.get('https://distilnetworks.com') uc.install(
``` executable_path='c:/users/user1/chromedriver.exe',
)
#### datadome.co example #### opts = uc.ChromeOptions()
These guys have actually a powerful product, and a link to this repo, which makes me wanna test their product. opts.add_argument(f'--proxy-server=socks5://127.0.0.1:9050')
Make sure you use a "clean" ip for this one. driver = uc.Chrome(options=opts)
```python driver.get('https://distilnetworks.com')
# ```
# STANDARD selenium Chromedriver
#
from selenium import webdriver #### datadome.co example (v1 old stuff) ####
chrome = webdriver.Chrome() These guys have actually a powerful product, and a link to this repo, which makes me wanna test their product.
chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') Make sure you use a "clean" ip for this one.
chrome.save_screenshot('datadome_regular_webdriver.png') ```python
True # it caused my ip to be flagged, unfortunately #
# STANDARD selenium Chromedriver
#
# from selenium import webdriver
# UNDETECTED chromedriver (headless,even) chrome = webdriver.Chrome()
# chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/')
import undetected_chromedriver as uc chrome.save_screenshot('datadome_regular_webdriver.png')
options = uc.ChromeOptions() True # it caused my ip to be flagged, unfortunately
options.headless=True
options.add_argument('--headless')
chrome = uc.Chrome(options=options) #
chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/') # UNDETECTED chromedriver (headless,even)
chrome.save_screenshot('datadome_undetected_webddriver.png') #
import undetected_chromedriver as uc
``` options = uc.ChromeOptions()
**Check both saved screenhots [here](https://imgur.com/a/fEmqadP)** options.headless=True
options.add_argument('--headless')
chrome = uc.Chrome(options=options)
chrome.get('https://datadome.co/customers-stories/toppreise-ends-web-scraping-and-content-theft-with-datadome/')
## important note ## chrome.save_screenshot('datadome_undetected_webddriver.png')
Due to the inner workings of the module, it is needed to browse programmatically (ie: using .get(url) ). Never use the gui to navigate. Using your keybord and mouse for navigation causes possible detection! New Tabs: same story. If you really need multi-tabs, then open the tab with the blank page (hint: url is `data:,` including comma, and yes, driver accepts it) and do your thing as usual. If you follow these "rules" (actually its default behaviour), then you will have a great time for now. ```
**Check both saved screenhots [here](https://imgur.com/a/fEmqadP)**
TL;DR and for the visual-minded:
```python
In [1]: import undetected_chromedriver as uc ## important note (v1 old stuff) ####
In [2]: driver = uc.Chrome()
In [3]: driver.execute_script('return navigator.webdriver') Due to the inner workings of the module, it is needed to browse programmatically (ie: using .get(url) ). Never use the gui to navigate. Using your keybord and mouse for navigation causes possible detection! New Tabs: same story. If you really need multi-tabs, then open the tab with the blank page (hint: url is `data:,` including comma, and yes, driver accepts it) and do your thing as usual. If you follow these "rules" (actually its default behaviour), then you will have a great time for now.
Out[3]: True # Detectable
In [4]: driver.get('https://distilnetworks.com') # starts magic TL;DR and for the visual-minded:
In [4]: driver.execute_script('return navigator.webdriver')
In [5]: None # Undetectable! ```python
``` In [1]: import undetected_chromedriver as uc
## end important note ## In [2]: driver = uc.Chrome()
In [3]: driver.execute_script('return navigator.webdriver')
Out[3]: True # Detectable
In [4]: driver.get('https://distilnetworks.com') # starts magic
In [4]: driver.execute_script('return navigator.webdriver')
In [5]: None # Undetectable!
```
## end important note ##

View File

@ -12,29 +12,49 @@ Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y
BY ULTRAFUNKAMSTERDAM (https://github.com/ultrafunkamsterdam)""" BY ULTRAFUNKAMSTERDAM (https://github.com/ultrafunkamsterdam)"""
from setuptools import setup from setuptools import setup
import os
import re
import codecs
dirname = os.path.abspath(os.path.dirname(__file__))
with codecs.open(
os.path.join(dirname, "undetected_chromedriver", "__init__.py"),
mode="r",
encoding="latin1",
) as fp:
try:
version = re.findall(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M)[0]
except Exception:
raise RuntimeError("unable to determine version")
setup( setup(
name="undetected-chromedriver", name="undetected-chromedriver",
version="2.2.1", version=version,
packages=["undetected_chromedriver"], packages=["undetected_chromedriver"],
install_requires=["selenium",], install_requires=[
"selenium",
],
url="https://github.com/ultrafunkamsterdam/undetected-chromedriver", url="https://github.com/ultrafunkamsterdam/undetected-chromedriver",
license="GPL-3.0", license="GPL-3.0",
author="UltrafunkAmsterdam", author="UltrafunkAmsterdam",
author_email="info@blackhat-security.nl", author_email="info@blackhat-security.nl",
description="""\ description="""\
selenium.webdriver.Chrome replacement with focus on stealth. selenium.webdriver.Chrome replacement wiht compatiblity for Brave, and other Chromium baed browsers.
not triggered by Distil / CloudFlare / Imperva / DataDome / hCaptcha and such. not triggered by CloudFlare/Imperva/hCaptcha and such.
NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms. NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms.
""", """,
long_description=open("README.md").read(), long_description=open(os.path.join(dirname, "README.md")).read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
classifiers=[ classifiers=[
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
) )

View File

@ -1,36 +0,0 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import time # noqa
def test_undetected_chromedriver():
import undetected_chromedriver.v2 as uc
driver = uc.Chrome()
with driver:
driver.get("https://coinfaucet.eu")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("coinfaucet.eu.png")
with driver:
driver.get("https://cia.gov")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("cia.gov.png")
with driver:
driver.get("https://lhcdn.botprotect.io")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("notprotect.io.png")
with driver:
driver.get("https://www.datadome.co")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("datadome.co.png")
test_undetected_chromedriver()

View File

@ -31,7 +31,7 @@ from selenium.webdriver import Chrome as _Chrome
from selenium.webdriver import ChromeOptions as _ChromeOptions from selenium.webdriver import ChromeOptions as _ChromeOptions
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
__version__ = "2.2.7"
TARGET_VERSION = 0 TARGET_VERSION = 0

View File

@ -0,0 +1,66 @@
import logging
import os
import sys
import time # noqa
from ..v2 import *
logging.basicConfig(level=10)
logger = logging.getLogger("TEST")
logger.setLevel(20)
JS_SERIALIZE_FUNCTION = """
decycle=function(n,e){"use strict";var t=new WeakMap;return function n(o,r){var c,i;return void 0!==e&&(o=e(o)),"object"!=typeof o||null===o||o instanceof Boolean||o instanceof Date||o instanceof Number||o instanceof RegExp||o instanceof String?o:void 0!==(c=t.get(o))?{$ref:c}:(t.set(o,r),Array.isArray(o)?(i=[],o.forEach(function(e,t){i[t]=n(e,r+"["+t+"]")})):(i={},Object.keys(o).forEach(function(e){i[e]=n(o[e],r+"["+JSON.stringify(e)+"]")})),i)}(n,"$")};
function replacer(t){try{if(Array.prototype.splice.call(t).length<100){let e={};for(let r in t)e[r]=t[r];return e}}catch(t){}}
return decycle(window)
"""
def test_quick():
import undetected_chromedriver.v2 as uc
print("uc module: ", uc)
# options = selenium.webdriver.ChromeOptions()
options = uc.ChromeOptions()
options.add_argument("--user-data-dir=c:\\temp")
options.binary_location = uc.find_chrome_executable()
driver = uc.Chrome(
executable_path="./chromedriver.exe",
options=options,
service_log_path="c:\\temp\\service.log.txt",
)
while True:
sys.stdin.read()
def test_undetected_chromedriver():
import undetected_chromedriver.v2 as uc
driver = uc.Chrome()
with driver:
driver.get("https://coinfaucet.eu")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("coinfaucet.eu.png")
with driver:
driver.get("https://cia.gov")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("cia.gov.png")
with driver:
driver.get("https://lhcdn.botprotect.io")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("notprotect.io.png")
with driver:
driver.get("https://www.datadome.co")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("datadome.co.png")
# test_quick()
# #test_undetected_chromedriver()

View File

@ -0,0 +1,36 @@
import pytest
from _pytest.fixtures import FixtureRequest
import undetected_chromedriver.v2 as uc
FAILED_SCREENSHOT_NAME = "failed.png"
@pytest.fixture
def head_uc(request: FixtureRequest):
request.instance.driver = uc.Chrome()
def teardown():
request.instance.driver.save_screenshot(FAILED_SCREENSHOT_NAME)
request.instance.driver.quit()
request.addfinalizer(teardown)
return request.instance.driver
@pytest.fixture
def headless_uc(request: FixtureRequest):
options = uc.ChromeOptions()
options.headless = True
request.instance.driver = uc.Chrome(options=options)
def teardown():
request.instance.driver.sapipve_screenshot(FAILED_SCREENSHOT_NAME)
request.instance.driver.quit()
request.addfinalizer(teardown)
return request.instance.driver
pytest.main()

View File

@ -31,7 +31,6 @@ whats new:
""" """
from __future__ import annotations from __future__ import annotations
import io import io
@ -44,11 +43,8 @@ import string
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import threading
import time import time
import zipfile import zipfile
import atexit
import contextlib
from distutils.version import LooseVersion from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve from urllib.request import urlopen, urlretrieve
@ -56,21 +52,25 @@ import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver import selenium.webdriver.chrome.webdriver
import selenium.webdriver.common.service import selenium.webdriver.common.service
import selenium.webdriver.remote.webdriver import selenium.webdriver.remote.webdriver
from selenium.webdriver.chrome.options import Options as _ChromeOptions
__all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable") __all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable")
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux"))
logger = logging.getLogger("uc") logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
def find_chrome_executable(): def find_chrome_executable():
""" """
returns the full path to the chrome _browser binary Finds the chrome, chrome beta, chrome canary, chromium executable
may not work if chrome is in a custom folder.
Returns
-------
executable_path : str
the full file path to found executable
:return: path to chrome executable
:rtype: str
""" """
candidates = set() candidates = set()
if IS_POSIX: if IS_POSIX:
@ -97,114 +97,232 @@ def find_chrome_executable():
class Chrome(object): class Chrome(object):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
__doc__ = (
"""\
-------------------------------------------------------------------------- --------------------------------------------------------------------------
NOTE: NOTE:
Chrome has everything included to work out of the box. Chrome has everything included to work out of the box.
it does not `need` customizations. it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems. any customizations MAY lead to trigger bot migitation systems.
-------------------------------------------------------------------------- --------------------------------------------------------------------------
""" """
+ selenium.webdriver.remote.webdriver.WebDriver.__doc__
)
_instances = set() _instances = set()
def __init__( def __init__(
self, self,
executable_path="./chromedriver", executable_path=None,
port=0, port=0,
options=None, options=None,
service_args=None, service_args=None,
desired_capabilities=None, desired_capabilities=None,
service_log_path=None, service_log_path=None,
chrome_options=None,
keep_alive=True, keep_alive=True,
debug_addr=None, log_level=0,
user_data_dir=None, headless=False,
factor=1,
delay=2,
emulate_touch=False, emulate_touch=False,
delay=5,
): ):
"""
Creates a new instance of the chrome driver.
p = Patcher(target_path=executable_path) Starts the service and then creates new instance of chrome driver.
p.auto(False)
self._patcher = p
self.factor = factor
self.delay = delay
self.port = port
self.process = None
self.browser_args = None
self._rcount = 0
self._rdiff = 10
try: Parameters
dbg = debug_addr.split(":") ----------
debug_host, debug_port = str(dbg[0]), int(dbg[1]) executable_path: str, optional, default: None - use find_chrome_executable
except AttributeError: Path to the executable. If the default is used it assumes the executable is in the $PATH
debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1"
if not debug_addr: port: int, optional, default: 0
debug_addr = f"{debug_host}:{debug_port}" port you would like the service to run, if left as 0, a free port will be found.
if not user_data_dir: options: ChromeOptions, optional, default: None - automatic useful defaults
user_data_dir = os.path.normpath(tempfile.mkdtemp()) this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "proxy" or "loggingPref".
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
emulate_touch: bool, optional, default: False
if set to True, patches window.maxTouchPoints to always return non-zero
delay: int, optional, default: 5
delay in seconds to wait before giving back control.
this is used only when using the context manager
(`with` statement) to bypass, for example CloudFlare.
5 seconds is a foolproof value.
"""
patcher = Patcher(executable_path=executable_path)
patcher.auto()
if not options: if not options:
options = selenium.webdriver.chrome.webdriver.Options() options = selenium.webdriver.chrome.webdriver.Options()
try:
if options.session and options.session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options.session = self
debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1"
if not options.debugger_address: if not options.debugger_address:
options.debugger_address = debug_addr options.debugger_address = "%s:%d" % (debug_host, debug_port)
options.add_argument("--remote-debugging-host=%s " % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
# see if a custom user profile is specified
for arg in options.arguments:
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
break
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not options.binary_location: if not options.binary_location:
options.binary_location = find_chrome_executable() options.binary_location = find_chrome_executable()
self._delay = delay
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if headless or options.headless:
options.headless = True
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
import json
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
fs.write(json.dumps(config, indent=4))
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities: if not desired_capabilities:
desired_capabilities = options.to_capabilities() desired_capabilities = options.to_capabilities()
self.options = options # unlock_port(debug_port)
self.user_data_dir = user_data_dir
extra_args = options.arguments
if options.headless:
extra_args.append("--headless")
extra_args.append("--window-size=1920,1080")
self.browser_args = [
find_chrome_executable(),
"--user-data-dir=%s" % user_data_dir,
"--remote-debugging-host=%s" % debug_host,
"--remote-debugging-port=%s" % debug_port,
"--log-level=%d" % divmod(logging.getLogger().getEffectiveLevel(), 10)[0],
*extra_args,
]
self.browser = subprocess.Popen( self.browser = subprocess.Popen(
self.browser_args, [options.binary_location, *options.arguments],
# close_fds="win32" in sys.platform,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver( self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver(
executable_path=p.target_path, executable_path=patcher.executable_path,
port=port, port=port,
options=options, options=options,
service_args=service_args, service_args=service_args,
desired_capabilities=desired_capabilities, desired_capabilities=desired_capabilities,
service_log_path=service_log_path, service_log_path=service_log_path,
chrome_options=chrome_options,
keep_alive=keep_alive, keep_alive=keep_alive,
) )
self.__class__._instances.add((self, options))
if options.headless: if options.headless:
if emulate_touch:
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
orig_get = self.webdriver.get orig_get = self.webdriver.get
@ -228,29 +346,29 @@ class Chrome(object):
: target[key] : target[key]
}) })
}); });
Object.defineProperty(Notification, "permission", {
configurable: true,
enumerable: true,
get: () => {
return "unknown"
},
});
""" """
}, },
) )
logger.info("removing headless from user-agent string") logger.info("removing headless from user-agent string")
self.execute_cdp_cmd( self.execute_cdp_cmd(
"Network.setUserAgentOverride", "Network.setUserAgentOverride",
{ {
"userAgent": self.execute_script( "userAgent": self.execute_script(
"return navigator.userAgent" "return navigator.userAgent"
).replace("Headless", "") ).replace("Headless", "")
}, },
) )
logger.info("fixing notifications permission in headless browsers") self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
// fix Notification permission in headless mode
Object.defineProperty(Notification, 'permission', { get: () => "default"});
"""
},
)
if emulate_touch: if emulate_touch:
self.execute_cdp_cmd( self.execute_cdp_cmd(
@ -278,50 +396,27 @@ class Chrome(object):
def __dir__(self): def __dir__(self):
return object.__dir__(self) + object.__dir__(self.webdriver) return object.__dir__(self) + object.__dir__(self.webdriver)
def reconnect(self):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None): def start_session(self, capabilities=None, browser_profile=None):
if not capabilities: if not capabilities:
capabilities = self.options.to_capabilities() capabilities = self.options.to_capabilities()
self.webdriver.start_session(capabilities, browser_profile) self.webdriver.start_session(capabilities, browser_profile)
def get_in(self, url: str, delay=2, factor=1):
"""
:param url: str
:param delay: int
:param factor: disconnect <factor> seconds after .get()
too low will disconnect before get() fired.
=================================================
In case you are being detected by some sophisticated
algorithm, and you are the kind that hates losing,
this might be your friend.
this currently works for hCaptcha based systems
(this includes CloudFlare!), and also passes many
custom setups (eg: ticketmaster.com),
Once you are past the first challenge, a cookie is saved
which (in my tests) also worked for other sites, and lasted
my entire session! However, to play safe, i'd recommend to just
call it once for every new site/domain you navigate to.
NOTE: mileage may vary!
bad behaviour can still be detected, and this program does not
magically "fix" a flagged ip.
please don't spam issues on github! first look if the issue
is not already reported.
"""
try:
self.get(url)
finally:
self.service.stop()
# threading.Timer(factor or self.factor, self.close).start()
time.sleep(delay or self.delay)
self.service.start()
self.start_session()
def quit(self): def quit(self):
logger.debug("closing webdriver") logger.debug("closing webdriver")
try: try:
@ -336,13 +431,20 @@ class Chrome(object):
logger.debug(e, exc_info=True) logger.debug(e, exc_info=True)
except Exception: # noqa except Exception: # noqa
pass pass
try: if not self.keep_user_data_dir or self.keep_user_data_dir is False:
logger.debug("removing profile : %s" % self.user_data_dir) for _ in range(3):
shutil.rmtree(self.user_data_dir, ignore_errors=False) try:
except PermissionError: logger.debug("removing profile : %s" % self.user_data_dir)
logger.debug("permission error. files are still in use/locked. retying...") shutil.rmtree(self.user_data_dir, ignore_errors=False)
time.sleep(1) except FileNotFoundError:
self.quit() pass
except PermissionError:
logger.debug(
"permission error. files are still in use/locked. retying..."
)
else:
break
time.sleep(1)
def __del__(self): def __del__(self):
self.quit() self.quit()
@ -352,59 +454,121 @@ class Chrome(object):
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop() self.service.stop()
# threading.Timer(self.factor, self.service.start).start() time.sleep(self._delay)
time.sleep(self.delay)
self.service.start() self.service.start()
self.start_session() self.start_session()
def __hash__(self): def __hash__(self):
return hash(self.options.debugger_address) return hash(self.options.debugger_address)
def find_elements_by_text(self, text: str):
for elem in self.find_elements_by_css_selector("*"):
try:
if text.lower() in elem.text.lower():
yield elem
except Exception as e:
logger.debug("find_elements_by_text: %s" % e)
def find_element_by_text(self, text: str):
for elem in self.find_elements_by_css_selector("*"):
try:
if text.lower() in elem.text.lower():
return elem
except Exception as e:
logger.debug("find_elements_by_text: %s" % e)
class Patcher(object): class Patcher(object):
url_repo = "https://chromedriver.storage.googleapis.com" url_repo = "https://chromedriver.storage.googleapis.com"
zip_name = "chromedriver_%s.zip"
exe_name = "chromedriver%s"
def __init__( platform = sys.platform
self, target_path="./chromedriver", force=False, version_main: int = 0 if platform.endswith("win32"):
): zip_name %= "win32"
if not IS_POSIX: exe_name %= ".exe"
if not target_path[-4:] == ".exe": if platform.endswith("linux"):
target_path += ".exe" zip_name %= "linux64"
exe_name %= ""
if platform.endswith("darwin"):
zip_name %= "mac64"
exe_name %= ""
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver"
elif platform.startswith("linux"):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
else:
d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d))
def __init__(self, executable_path=None, force=False, version_main: int = 0):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
force: False
terminate processes which are holding lock
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
self.force = force self.force = force
z, e = self.get_package_name()
if not target_path:
target_path = e
self.exename = e if not executable_path:
self.target_path = target_path executable_path = os.path.join(self.data_path, self.exe_name)
self.zipname = z
if not IS_POSIX:
if not executable_path[-4:] == ".exe":
executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, self.zip_name)
self.executable_path = os.path.abspath(os.path.join(".", executable_path))
self.version_main = version_main self.version_main = version_main
self.version_full = None self.version_full = None
def auto(self, force=False): @classmethod
def auto(cls, executable_path=None, force=False):
"""
Args:
force:
Returns:
"""
i = cls(executable_path, force=force)
try: try:
os.unlink(self.target_path) os.unlink(i.executable_path)
except PermissionError: except PermissionError:
if i.force:
if force or self.force: cls.force_kill_instances(i.executable_path)
self.force_kill_instances() return i.auto(force=False)
return self.auto() try:
if i.is_binary_patched():
if self.verify_patch(): # assumes already running AND patched
# assumes already running AND patched return True
return True except PermissionError:
return False pass
# return False
except FileNotFoundError: except FileNotFoundError:
pass pass
release = self.fetch_release_number() release = i.fetch_release_number()
self.version_main = release.version[0] i.version_main = release.version[0]
self.version_full = release i.version_full = release
self.fetch_package() i.unzip_package(i.fetch_package())
self.unzip_package() i.patch()
return i
def patch(self):
self.patch_exe() self.patch_exe()
return self.verify_patch() return self.is_binary_patched()
def fetch_release_number(self): def fetch_release_number(self):
""" """
@ -420,9 +584,9 @@ class Patcher(object):
return LooseVersion(urlopen(self.url_repo + path).read().decode()) return LooseVersion(urlopen(self.url_repo + path).read().decode())
def parse_exe_version(self): def parse_exe_version(self):
with io.open(self.target_path, "rb") as f: with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""): for line in iter(lambda: f.readline(), b""):
match = re.search(br"platform_handle\x00content\x00([0-9\.]*)", line) match = re.search(br"platform_handle\x00content\x00([0-9.]*)", line)
if match: if match:
return LooseVersion(match[1].decode()) return LooseVersion(match[1].decode())
@ -432,61 +596,44 @@ class Patcher(object):
:return: path to downloaded file :return: path to downloaded file
""" """
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zipname) u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
logger.debug("downloading from %s" % u) logger.debug("downloading from %s" % u)
zp, *_ = urlretrieve(u, filename=self.zipname) # return urlretrieve(u, filename=self.data_path)[0]
return zp return urlretrieve(u)[0]
def unzip_package(self): def unzip_package(self, fp):
""" """
Does what it says Does what it says
:return: path to unpacked executable :return: path to unpacked executable
""" """
logger.debug("unzipping %s" % self.zipname) logger.debug("unzipping %s" % fp)
try: try:
os.makedirs(os.path.dirname(self.target_path), mode=0o755) os.unlink(self.zip_path)
except OSError: except (FileNotFoundError, OSError):
pass pass
with zipfile.ZipFile(self.zipname, mode="r") as zf:
zf.extract(self.exename) os.makedirs(self.data_path, mode=0o755, exist_ok=True)
os.rename(self.exename, self.target_path)
os.remove(self.zipname) with zipfile.ZipFile(fp, mode="r") as zf:
os.chmod(self.target_path, 0o755) zf.extract(self.exe_name, os.path.dirname(self.executable_path))
return self.target_path os.remove(fp)
os.chmod(self.executable_path, 0o755)
return self.executable_path
@staticmethod @staticmethod
def get_package_name(): def force_kill_instances(exe_name):
"""
returns a tuple of (zipname, exename) depending on platform.
:return: (zipname, exename)
"""
zipname = "chromedriver_%s.zip"
exe = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"):
zipname %= "win32"
exe %= ".exe"
if platform.endswith("linux"):
zipname %= "linux64"
exe %= ""
if platform.endswith("darwin"):
zipname %= "mac64"
exe %= ""
return zipname, exe
def force_kill_instances(self):
""" """
kills running instances. kills running instances.
:param: executable name to kill, may be a path as well
:param self:
:return: True on success else False :return: True on success else False
""" """
exe_name = os.path.basename(exe_name)
if IS_POSIX: if IS_POSIX:
r = os.system("kill -f -9 $(pidof %s)" % self.exename) r = os.system("kill -f -9 $(pidof %s)" % exe_name)
else: else:
r = os.system("taskkill /f /im %s" % self.exename) r = os.system("taskkill /f /im %s" % exe_name)
return not r return not r
@staticmethod @staticmethod
@ -497,19 +644,18 @@ class Patcher(object):
cdc[3] = "_" cdc[3] = "_"
return "".join(cdc).encode() return "".join(cdc).encode()
def verify_patch(self): def is_binary_patched(self, executable_path=None):
"""simple check if executable is patched. """simple check if executable is patched.
:return: False if not patched, else True :return: False if not patched, else True
""" """
try: executable_path = executable_path or self.executable_path
with io.open(self.target_path, "rb") as fh: with io.open(executable_path, "rb") as fh:
for line in iter(lambda: fh.readline(), b""): for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line: if b"cdc_" in line:
return False return False
return True else:
except FileNotFoundError: return True
return False
def patch_exe(self): def patch_exe(self):
""" """
@ -517,12 +663,11 @@ class Patcher(object):
:return: False on failure, binary name on success :return: False on failure, binary name on success
""" """
logger.info("patching driver executable %s" % self.executable_path)
logger.info("patching driver executable %s" % self.target_path)
linect = 0 linect = 0
replacement = self.gen_random_cdc() replacement = self.gen_random_cdc()
with io.open(self.target_path, "r+b") as fh: with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""): for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line: if b"cdc_" in line:
fh.seek(-len(line), 1) fh.seek(-len(line), 1)
@ -531,6 +676,46 @@ class Patcher(object):
linect += 1 linect += 1
return linect return linect
def __repr__(self):
return "{0:s}({1:s})".format(
self.__class__.__name__,
self.executable_path,
)
class ChromeOptions(selenium.webdriver.chrome.webdriver.Options):
pass #
#
# def unlock_port(port):
# import os
# if not IS_POSIX:
# try:
#
# c = subprocess.Popen('netstat -ano | findstr :%d' % port, shell=True, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# stdout, stderr = c.communicate()
# lines = stdout.splitlines()
# _pid = lines[0].split(b' ')[-1].decode()
# c = subprocess.Popen(['taskkill', '/f', '/pid', _pid], shell=True, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# stdout, stderr = c.communicate()
# except Exception as e:
# logger.debug(e)
#
# else:
# try:
# os.system('kill -15 $(lsof -i:%d)' % port)
# except Exception:
# pass
#
class ChromeOptions(_ChromeOptions):
session = None
def add_extension_file_crx(self, extension=None):
if extension:
extension_to_add = os.path.abspath(os.path.expanduser(extension))
logger.debug("extension_to_add: %s" % extension_to_add)
return super().add_extension(r"%s" % extension)