undetected-chromedriver/undetected_chromedriver/v2.py

587 lines
19 KiB
Python
Raw Normal View History

2021-01-16 22:11:11 -07:00
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
"""
V2 beta
whats new:
- currently this v2 module will be available as option.
to use it / test it, you need to alter your imports by appending .v2
- headless mode not (yet) supported in v2
example:
```python
import undetected_chromedriver.v2 as uc
driver = uc.Chrome()
driver.get('https://somewebsite.xyz')
# if site is protected by hCaptcha/Cloudflare
driver.get_in('https://cloudflareprotectedsite.xyz')
# if site is protected by hCaptcha/Cloudflare
# (different syntax, same function)
with driver:
driver.get('https://cloudflareprotectedsite.xyz')
```
tests/example in ../tests/test_undetected_chromedriver.py
"""
from __future__ import annotations
import io
import logging
import os
import random
import re
import string
import subprocess
import sys
import tempfile
import time
import zipfile
2021-04-27 12:41:18 -06:00
import shutil
2021-01-16 22:11:11 -07:00
from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve
2021-04-27 12:19:51 -06:00
2021-01-16 22:11:11 -07:00
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
import selenium.webdriver.common.service
import selenium.webdriver.remote.webdriver
2021-04-27 12:19:51 -06:00
from selenium.webdriver.chrome.options import Options as _ChromeOptions
2021-01-16 22:11:11 -07:00
__all__ = ("Chrome", "ChromeOptions", "Patcher", "find_chrome_executable")
2021-01-16 22:11:11 -07:00
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux"))
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
2021-01-16 22:11:11 -07:00
def find_chrome_executable():
"""
returns the full path to the chrome _browser binary
may not work if chrome is in a custom folder.
:return: path to chrome executable
:rtype: str
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in ("google-chrome", "chromium", "chromium-browser"):
candidates.add(os.sep.join((item, subitem)))
2021-02-04 04:15:22 -07:00
if "darwin" in sys.platform:
candidates.update(
["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"]
)
2021-01-16 22:11:11 -07:00
else:
for item in map(
os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA")
):
for subitem in (
"Google/Chrome/Application",
"Google/Chrome Beta/Application",
"Google/Chrome Canary/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)
class Chrome(object):
2021-02-04 04:15:22 -07:00
__doc__ = (
"""\
2021-04-27 12:19:51 -06:00
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
2021-02-04 04:15:22 -07:00
+ selenium.webdriver.remote.webdriver.WebDriver.__doc__
)
2021-01-16 22:11:11 -07:00
_instances = set()
def __init__(
self,
2021-01-17 13:05:40 -07:00
executable_path="./chromedriver",
2021-01-16 22:11:11 -07:00
port=0,
options=None,
service_args=None,
desired_capabilities=None,
service_log_path=None,
keep_alive=True,
2021-04-27 12:19:51 -06:00
keep_user_data_dir=False,
log_level=0,
emulate_touch=False,
2021-04-27 12:19:51 -06:00
):
p = Patcher.auto(executable_path=executable_path)
# p.auto(False)
self._patcher = p
2021-01-16 22:11:11 -07:00
self.port = port
self.process = None
self.browser_args = None
self._rcount = 0
self._rdiff = 10
2021-04-27 12:19:51 -06:00
self.keep_user_data_dir = keep_user_data_dir
2021-01-16 22:11:11 -07:00
2021-04-27 12:19:51 -06:00
debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1"
2021-01-16 22:11:11 -07:00
if not options:
options = selenium.webdriver.chrome.webdriver.Options()
if not options.debugger_address:
2021-04-27 12:19:51 -06:00
options.debugger_address = "%s:%d" % (debug_host, debug_port)
2021-01-16 22:11:11 -07:00
if not options.binary_location:
options.binary_location = find_chrome_executable()
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
user_data_dir = None
for arg in options.arguments:
2021-04-27 12:19:51 -06:00
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
2021-04-27 12:19:51 -06:00
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
self.keep_user_data_dir = True
break
except IndexError:
2021-04-27 12:19:51 -06:00
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
2021-04-27 12:19:51 -06:00
self.keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
2021-04-27 12:19:51 -06:00
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
2021-01-16 22:11:11 -07:00
self.user_data_dir = user_data_dir
self.options = options
extra_args = options.arguments
2021-01-16 22:11:11 -07:00
if options.headless:
extra_args.append("--headless")
2021-02-03 01:29:41 -07:00
extra_args.append("--window-size=1920,1080")
2021-01-16 22:11:11 -07:00
self.browser_args = [
2021-04-21 00:05:33 -06:00
options.binary_location,
2021-01-16 22:11:11 -07:00
"--remote-debugging-host=%s" % debug_host,
"--remote-debugging-port=%s" % debug_port,
2021-04-27 12:19:51 -06:00
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0],
2021-01-16 22:11:11 -07:00
*extra_args,
]
self.browser = subprocess.Popen(
self.browser_args,
# close_fds="win32" in sys.platform,
2021-01-16 22:11:11 -07:00
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver(
# executable_path=p.executable_path,
2021-01-16 22:11:11 -07:00
port=port,
options=options,
service_args=service_args,
desired_capabilities=desired_capabilities,
service_log_path=service_log_path,
keep_alive=keep_alive,
)
if options.headless:
orig_get = self.webdriver.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver'
? undefined
: typeof target[key] === 'function'
? target[key].bind(target)
: target[key]
})
});
Object.defineProperty(Notification, "permission", {
configurable: true,
enumerable: true,
get: () => {
return "unknown"
},
});
"""
},
)
logger.info("removing headless from user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
logger.info("fixing notifications permission in headless browsers")
if emulate_touch:
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
return orig_get(*args, **kwargs)
self.webdriver.get = get_wrapped
def __getattribute__(self, attr):
try:
return object.__getattribute__(self, attr)
except AttributeError:
try:
return object.__getattribute__(self.webdriver, attr)
except AttributeError:
raise
def __dir__(self):
return object.__dir__(self) + object.__dir__(self.webdriver)
2021-01-16 22:11:11 -07:00
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
self.webdriver.start_session(capabilities, browser_profile)
2021-04-27 12:19:51 -06:00
# def get_in(self, url: str, delay=2, factor=1):
# """
# :param url: str
# :param delay: int
# :param factor: disconnect <factor> seconds after .get()
# too low will disconnect before get() fired.
#
# =================================================
#
# In case you are being detected by some sophisticated
# algorithm, and you are the kind that hates losing,
# this might be your friend.
#
# this currently works for hCaptcha based systems
# (this includes CloudFlare!), and also passes many
# custom setups (eg: ticketmaster.com),
#
#
# Once you are past the first challenge, a cookie is saved
# which (in my tests) also worked for other sites, and lasted
# my entire session! However, to play safe, i'd recommend to just
# call it once for every new site/domain you navigate to.
#
# NOTE: mileage may vary!
# bad behaviour can still be detected, and this program does not
# magically "fix" a flagged ip.
#
# please don't spam issues on github! first look if the issue
# is not already reported.
# """
# try:
# self.get(url)
# finally:
# self.service.stop()
# # threading.Timer(factor or self.factor, self.close).start()
# time.sleep(delay or self.delay)
# self.service.start()
# self.start_session()
#
2021-04-27 12:41:18 -06:00
def quit(self):
logger.debug("closing webdriver")
try:
self.webdriver.quit()
except Exception: # noqa
pass
try:
logger.debug("killing browser")
self.browser.kill()
self.browser.wait(1)
except TimeoutError as e:
logger.debug(e, exc_info=True)
except Exception: # noqa
pass
if not self.keep_user_data_dir or self.keep_user_data_dir is False:
for _ in range(3):
try:
logger.debug("removing profile : %s" % self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except PermissionError:
logger.debug("permission error. files are still in use/locked. retying...")
else:
break
time.sleep(1)
2021-01-16 22:11:11 -07:00
def __del__(self):
self.quit()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
2021-02-04 04:15:22 -07:00
# threading.Timer(self.factor, self.service.start).start()
2021-01-16 22:11:11 -07:00
time.sleep(self.delay)
self.service.start()
self.start_session()
2021-01-16 22:11:11 -07:00
def __hash__(self):
return hash(self.options.debugger_address)
class Patcher(object):
url_repo = "https://chromedriver.storage.googleapis.com"
zip_name = "chromedriver_%s.zip"
exe_name = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"):
zip_name %= "win32"
exe_name %= ".exe"
if platform.endswith("linux"):
zip_name %= "linux64"
exe_name %= ""
if platform.endswith("darwin"):
zip_name %= "mac64"
exe_name %= ""
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver"
elif platform.startswith("linux"):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
else:
d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d))
2021-01-16 22:11:11 -07:00
def __init__(self, executable_path=None, force=False, version_main: int = 0):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
force: False
terminate processes which are holding lock
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
2021-01-16 22:11:11 -07:00
self.force = force
if not executable_path:
executable_path = os.path.join(self.data_path, self.exe_name)
if not IS_POSIX:
if not executable_path[-4:] == ".exe":
executable_path += ".exe"
2021-04-27 12:19:51 -06:00
self.zip_path = os.path.join(self.data_path, self.zip_name)
2021-04-27 12:19:51 -06:00
self.executable_path = os.path.abspath(os.path.join(".", executable_path))
2021-01-16 22:11:11 -07:00
self.version_main = version_main
self.version_full = None
@classmethod
2021-04-27 12:19:51 -06:00
def auto(cls, executable_path="./chromedriver", force=False):
"""
2021-01-16 22:11:11 -07:00
Args:
force:
2021-01-16 22:11:11 -07:00
Returns:
"""
i = cls(executable_path, force=force)
try:
os.unlink(i.executable_path)
except PermissionError:
if i.force:
cls.force_kill_instances(i.executable_path)
return i.auto(force=False)
try:
if i.is_binary_patched():
# assumes already running AND patched
return True
except PermissionError:
pass
# return False
2021-01-16 22:11:11 -07:00
except FileNotFoundError:
pass
release = i.fetch_release_number()
i.version_main = release.version[0]
i.version_full = release
i.unzip_package(i.fetch_package())
i.patch()
return i
def patch(self):
2021-01-16 22:11:11 -07:00
self.patch_exe()
return self.is_binary_patched()
2021-01-16 22:11:11 -07:00
def fetch_release_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
:rtype: LooseVersion
"""
2021-02-04 04:15:22 -07:00
path = "/latest_release"
if self.version_main:
path += f"_{self.version_main}"
path = path.upper()
2021-01-16 22:11:11 -07:00
logger.debug("getting release number from %s" % path)
return LooseVersion(urlopen(self.url_repo + path).read().decode())
def parse_exe_version(self):
with io.open(self.executable_path, "rb") as f:
2021-01-16 22:11:11 -07:00
for line in iter(lambda: f.readline(), b""):
2021-04-27 12:19:51 -06:00
match = re.search(br"platform_handle\x00content\x00([0-9.]*)", line)
2021-01-16 22:11:11 -07:00
if match:
return LooseVersion(match[1].decode())
def fetch_package(self):
"""
Downloads ChromeDriver from source
:return: path to downloaded file
"""
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
2021-01-16 22:11:11 -07:00
logger.debug("downloading from %s" % u)
# return urlretrieve(u, filename=self.data_path)[0]
return urlretrieve(u)[0]
2021-01-16 22:11:11 -07:00
def unzip_package(self, fp):
2021-01-16 22:11:11 -07:00
"""
Does what it says
:return: path to unpacked executable
"""
logger.debug("unzipping %s" % fp)
try:
os.unlink(self.zip_path)
except (FileNotFoundError, OSError):
pass
2021-01-16 22:11:11 -07:00
2021-04-27 12:19:51 -06:00
os.makedirs(self.data_path, mode=0o755, exist_ok=True)
2021-01-16 22:11:11 -07:00
with zipfile.ZipFile(fp, mode="r") as zf:
zf.extract(self.exe_name, os.path.dirname(self.executable_path))
# os.rename(self.zip_path, self.executable_path)
os.remove(fp)
os.chmod(self.executable_path, 0o755)
return self.executable_path
@staticmethod
def force_kill_instances(exe_name):
2021-01-16 22:11:11 -07:00
"""
kills running instances.
2021-04-27 12:19:51 -06:00
:param: executable name to kill, may be a path as well
2021-01-16 22:11:11 -07:00
:return: True on success else False
"""
exe_name = os.path.basename(exe_name)
2021-01-16 22:11:11 -07:00
if IS_POSIX:
r = os.system("kill -f -9 $(pidof %s)" % exe_name)
2021-01-16 22:11:11 -07:00
else:
r = os.system("taskkill /f /im %s" % exe_name)
2021-01-16 22:11:11 -07:00
return not r
@staticmethod
def gen_random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None):
2021-01-16 22:11:11 -07:00
"""simple check if executable is patched.
:return: False if not patched, else True
"""
executable_path = executable_path or self.executable_path
with io.open(executable_path, "rb") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
return False
else:
return True
2021-01-16 22:11:11 -07:00
def patch_exe(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
logger.info("patching driver executable %s" % self.executable_path)
2021-01-16 22:11:11 -07:00
linect = 0
replacement = self.gen_random_cdc()
with io.open(self.executable_path, "r+b") as fh:
2021-01-16 22:11:11 -07:00
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
# class ChromeOptions(selenium.webdriver.chrome.webdriver.Options):
class ChromeOptions(_ChromeOptions):
def add_extension_file_crx(self, extension=None):
if extension:
extension_to_add = os.path.abspath(os.path.expanduser(extension))
2021-04-27 12:19:51 -06:00
logger.debug("extension_to_add: %s" % extension_to_add)
2021-04-27 12:19:51 -06:00
return super().add_extension(r"%s" % extension)