-----------
  this version is for test purposes only and contains breaking changes
  -----------

  - v2 is now the "main/default" module. usage:

        import undetected_chromedriver as uc
        driver = uc.Chrome()
        driver.get('https://nowsecure.nl')

  - The above is the README for this version. or use the regular instructions, but
    skip the `with` black magic and skip references to v2.
  - v1 moved to _compat for now.
  - fixed wrong dependancies
  - ~~~~ added "new" anti-detection mechanic ~~~~

  - the above ^^ makes all recent changes and additions obsolete
  - Removed ChromeOptions black magic to fix compatiblity issues

  - restored .get() to (near) original.
       - most changes from 3.0.4 to 3.0.6 are obsolete, as t
       - no `with` statements needed anymore, although it will still
         work for the sake of backward-compatibility.
       - no sleeps, stop-start-sessions, delays, or async cdp black magic!
       - this will solve a lot of other "issues" as well.
  - test success to date: 100%
  - just to mention it another time, since some people have hard time reading:
    headless is still WIP. Raising issues is needless
This commit is contained in:
UltrafunkAmsterdam 2021-12-16 05:53:41 +01:00
parent c1d02484d9
commit b60820a600
16 changed files with 1245 additions and 1720 deletions

2
.gitignore vendored
View File

@ -127,3 +127,5 @@ dmypy.json
# Pyre type checker
.pyre/
.idea

View File

@ -10,6 +10,35 @@ Automatically downloads the driver binary and patches it.
* Works also on Brave Browser and many other Chromium based browsers, some tweaking
* Python 3.6++**
### 3.1.0rc1 ####
**this version is for test purposes only and contains breaking changes**
- v2 is now the "main/default" module.
```python
import undetected_chromedriver as uc
driver = uc.Chrome()
driver.get('https://nowsecure.nl')
```
- The above is the README for this version. or use the regular instructions, but
skip the `with` black magic and skip references to v2.
- v1 moved to _compat for now.
- fixed wrong dependancies
- **~~~~ added "new" anti-detection mechanic ~~~~**
- the above ^^ makes all recent changes and additions obsolete
- Removed ChromeOptions black magic to fix compatiblity issues
- restored .get() to (near) original.
- most changes from 3.0.4 to 3.0.6 are obsolete, as t
- no `with` statements needed anymore, although it will still
work for the sake of backward-compatibility.
- no sleeps, stop-start-sessions, delays, or async cdp black magic!
- this will solve a lot of other "issues" as well.
- test success to date: 100%
- just to mention it another time, since some people have hard time reading:
**headless is still WIP. Raising issues is needless**
### 3.0.4 changes ####
- change process creation behavior to be fully detached

View File

@ -35,7 +35,7 @@ setup(
version=version,
packages=["undetected_chromedriver"],
install_requires=[
"selenium",
"selenium>=4.0.0",
"requests",
"websockets",
],

View File

@ -1,265 +1,726 @@
#!/usr/bin/env python3
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
import io
import logging
import os
import random
import re
import string
import sys
import zipfile
from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve
from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions
from . import v2
from . import cdp
from . import options
ChromeOptionsV2 = v2.ChromeOptions
logger = logging.getLogger(__name__)
__version__ = "3.0.6"
TARGET_VERSION = 0
class Chrome:
def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
if not kwargs.get("executable_path"):
kwargs["executable_path"] = "./{}".format(
ChromeDriverManager(*args, **kwargs).executable_path
)
if not kwargs.get("options"):
kwargs["options"] = ChromeOptions()
instance = object.__new__(_Chrome)
instance.__init__(*args, **kwargs)
instance._orig_get = instance.get
def _get_wrapped(*args, **kwargs):
if instance.execute_script("return navigator.webdriver"):
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver'
? undefined
: typeof target[key] === 'function'
? target[key].bind(target)
: target[key]
})
});
"""
},
)
return instance._orig_get(*args, **kwargs)
instance.get = _get_wrapped
instance.get = _get_wrapped
instance.get = _get_wrapped
original_user_agent_string = instance.execute_script(
"return navigator.userAgent"
)
instance.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": original_user_agent_string.replace("Headless", ""),
},
)
if emulate_touch:
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
return instance
class ChromeOptions:
def __new__(cls, *args, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
instance = object.__new__(_ChromeOptions)
instance.__init__()
instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
instance.add_argument("--disable-blink-features=AutomationControlled")
return instance
class ChromeDriverManager(object):
installed = False
selenium_patched = False
target_version = None
DL_BASE = "https://chromedriver.storage.googleapis.com/"
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
_platform = sys.platform
if TARGET_VERSION:
# use global if set
self.target_version = TARGET_VERSION
if target_version:
# use explicitly passed target
self.target_version = target_version # user override
if not self.target_version:
# none of the above (default) and just get current version
self.target_version = self.get_release_version_number().version[
0
] # only major version int
self._base = base_ = "chromedriver{}"
exe_name = self._base
if _platform in ("win32",):
exe_name = base_.format(".exe")
if _platform in ("linux",):
_platform += "64"
exe_name = exe_name.format("")
if _platform in ("darwin",):
_platform = "mac64"
exe_name = exe_name.format("")
self.platform = _platform
self.executable_path = executable_path or exe_name
self._exe_name = exe_name
def patch_selenium_webdriver(self_):
"""
Patches selenium package Chrome, ChromeOptions classes for current session
:return:
"""
import selenium.webdriver.chrome.service
import selenium.webdriver
selenium.webdriver.Chrome = Chrome
selenium.webdriver.ChromeOptions = ChromeOptions
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
self_.__class__.selenium_patched = True
def install(self, patch_selenium=True):
"""
Initialize the patch
This will:
download chromedriver if not present
patch the downloaded chromedriver
patch selenium package if <patch_selenium> is True (default)
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
:return:
"""
if not os.path.exists(self.executable_path):
self.fetch_chromedriver()
if not self.__class__.installed:
if self.patch_binary():
self.__class__.installed = True
if patch_selenium:
self.patch_selenium_webdriver()
def get_release_version_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
"""
path = (
"LATEST_RELEASE"
if not self.target_version
else f"LATEST_RELEASE_{self.target_version}"
)
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
def fetch_chromedriver(self):
"""
Downloads ChromeDriver from source and unpacks the executable
:return: on success, name of the unpacked executable
"""
base_ = self._base
zip_name = base_.format(".zip")
ver = self.get_release_version_number().vstring
if os.path.exists(self.executable_path):
return self.executable_path
urlretrieve(
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
filename=zip_name,
)
with zipfile.ZipFile(zip_name) as zf:
zf.extract(self._exe_name)
os.remove(zip_name)
if sys.platform != "win32":
os.chmod(self._exe_name, 0o755)
return self._exe_name
@staticmethod
def random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
return "".join(cdc).encode()
def patch_binary(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
linect = 0
replacement = self.random_cdc()
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
def install(executable_path=None, target_version=None, *args, **kwargs):
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()
#!/usr/bin/env python3
from __future__ import annotations
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
__version__ = "3.1.0rc1"
import asyncio
import json
import logging
import os
import re
import shutil
import sys
import tempfile
import time
import inspect
import requests
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
import selenium.webdriver.common.service
import selenium.webdriver.remote.webdriver
import websockets
from .cdp import CDP
from .options import ChromeOptions
from .patcher import IS_POSIX
from .patcher import Patcher
from .reactor import Reactor
__all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
from .dprocess import start_detached
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
_instances = set()
session_id = None
def __init__(
self,
executable_path=None,
port=0,
options=None,
enable_cdp_events=False,
service_args=None,
desired_capabilities=None,
service_log_path=None,
keep_alive=False,
log_level=0,
headless=False,
delay=5,
version_main=None,
patcher_force_close=False,
):
"""
Creates a new instance of the chrome driver.
Starts the service and then creates new instance of chrome driver.
Parameters
----------
executable_path: str, optional, default: None - use find_chrome_executable
Path to the executable. If the default is used it assumes the executable is in the $PATH
port: int, optional, default: 0
port you would like the service to run, if left as 0, a free port will be found.
options: ChromeOptions, optional, default: None - automatic useful defaults
this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
enable_cdp_events: bool, default: False
:: currently for chrome only
this enables the handling of wire messages
when enabled, you can subscribe to CDP events by using:
driver.add_cdp_listener("Network.dataReceived", yourcallback)
# yourcallback is an callable which accepts exactly 1 dict as parameter
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
delay: int, optional, default: 5
delay in seconds to wait before giving back control.
this is used only when using the context manager
(`with` statement) to bypass, for example CloudFlare.
5 seconds is a foolproof value.
version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87
patcher_force_close: bool, optional, default: False
instructs the patcher to do whatever it can to access the chromedriver binary
if the file is locked, it will force shutdown all instances.
setting it is not recommended, unless you know the implications and think
you might need it.
"""
patcher = Patcher(
executable_path=executable_path,
force=patcher_force_close,
version_main=version_main,
)
patcher.auto()
if not options:
options = ChromeOptions()
try:
if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options._session = self
debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1"
if not options.debugger_address:
options.debugger_address = "%s:%d" % (debug_host, debug_port)
if enable_cdp_events:
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
)
options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
user_data_dir, language, keep_user_data_dir = None, None, None
# see if a custom user profile is specified
for arg in options.arguments:
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
language = m[1]
except IndexError:
logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9"
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
if not user_data_dir:
if options.user_data_dir:
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True
logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not language:
try:
import locale
language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception:
pass
if not language:
language = "en-US"
options.add_argument("--lang=%s" % language)
if not options.binary_location:
options.binary_location = find_chrome_executable()
self._delay = delay
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if headless or options.headless:
options.headless = True
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
json.dump(config, fs)
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
self.browser_pid = start_detached(options.binary_location, *options.arguments)
# self.browser = subprocess.Popen(
# [options.binary_location, *options.arguments],
# stdin=subprocess.PIPE,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# close_fds=IS_POSIX,
# )
super(Chrome, self).__init__(
executable_path=patcher.executable_path,
port=port,
options=options,
service_args=service_args,
desired_capabilities=desired_capabilities,
service_log_path=service_log_path,
keep_alive=keep_alive,
)
# intentional
# self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver(
# executable_path=patcher.executable_path,
# port=port,
# options=options,
# service_args=service_args,
# desired_capabilities=desired_capabilities,
# service_log_path=service_log_path,
# keep_alive=keep_alive,
# )
self.reactor = None
if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger(
"selenium.webdriver.remote.remote_connection"
).setLevel(20)
reactor = Reactor(self)
reactor.start()
self.reactor = reactor
if options.headless:
self._configure_headless()
orig_get = self.get
def _configure_headless(self):
orig_get = self.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
logger.info("patch navigator.webdriver")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver' ?
false :
typeof target[key] === 'function' ?
target[key].bind(target) :
target[key]
})
});
"""
},
)
logger.info("patch user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
if self.options.mock_permissions:
logger.info("patch permissions api")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
// fix Notification permission in headless mode
Object.defineProperty(Notification, 'permission', { get: () => "default"});
"""
},
)
if self.options.emulate_touch:
logger.info("patch emulate touch")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
if self.options.mock_canvas_fp:
logger.info("patch HTMLCanvasElement fingerprinting")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
(function() {
const ORIGINAL_CANVAS = HTMLCanvasElement.prototype[name];
Object.defineProperty(HTMLCanvasElement.prototype, name, {
"value": function() {
var shift = {
'r': Math.floor(Math.random() * 10) - 5,
'g': Math.floor(Math.random() * 10) - 5,
'b': Math.floor(Math.random() * 10) - 5,
'a': Math.floor(Math.random() * 10) - 5
};
var width = this.width,
height = this.height,
context = this.getContext("2d");
var imageData = context.getImageData(0, 0, width, height);
for (var i = 0; i < height; i++) {
for (var j = 0; j < width; j++) {
var n = ((i * (width * 4)) + (j * 4));
imageData.data[n + 0] = imageData.data[n + 0] + shift.r;
imageData.data[n + 1] = imageData.data[n + 1] + shift.g;
imageData.data[n + 2] = imageData.data[n + 2] + shift.b;
imageData.data[n + 3] = imageData.data[n + 3] + shift.a;
}
}
context.putImageData(imageData, 0, 0);
return ORIGINAL_CANVAS.apply(this, arguments);
}
});
})(this)
"""
},
)
if self.options.mock_chrome_global:
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'chrome', {
value: new Proxy(window.chrome, {
has: (target, key) => true,
get: (target, key) => {
return {
app: {
isInstalled: false,
},
webstore: {
onInstallStageChanged: {},
onDownloadProgress: {},
},
runtime: {
PlatformOs: {
MAC: 'mac',
WIN: 'win',
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
OPENBSD: 'openbsd',
},
PlatformArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
PlatformNaclArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
RequestUpdateCheckStatus: {
THROTTLED: 'throttled',
NO_UPDATE: 'no_update',
UPDATE_AVAILABLE: 'update_available',
},
OnInstalledReason: {
INSTALL: 'install',
UPDATE: 'update',
CHROME_UPDATE: 'chrome_update',
SHARED_MODULE_UPDATE: 'shared_module_update',
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic',
},
},
}
}
})
});
"""
},
)
return orig_get(*args, **kwargs)
self.get = get_wrapped
def __dir__(self):
return object.__dir__(self)
def _get_cdc_props(self):
return self.execute_script(
"""
let objectToInspect = window,
result = [];
while(objectToInspect !== null)
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
objectToInspect = Object.getPrototypeOf(objectToInspect); }
return result.filter(i => i.match(/.+_.+_(Array|Promise|Symbol)/ig))
"""
)
def _hook_remove_cdc_props(self):
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
let objectToInspect = window,
result = [];
while(objectToInspect !== null)
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
objectToInspect = Object.getPrototypeOf(objectToInspect); }
result.forEach(p => p.match(/.+_.+_(Array|Promise|Symbol)/ig)
&&delete window[p]&&console.log('removed',p))
"""
},
)
def get(self, url):
if self._get_cdc_props():
self._hook_remove_cdc_props()
return super().get(url)
def add_cdp_listener(self, event_name, callback):
if (
self.reactor
and self.reactor is not None
and isinstance(self.reactor, Reactor)
):
self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers
return False
def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear()
def tab_new(self, url: str):
"""
this opens a url in a new tab.
apparently, that passes all tests directly!
Parameters
----------
url
Returns
-------
"""
if not hasattr(self, "cdp"):
from .cdp import CDP
self.cdp = CDP(self.options)
self.cdp.tab_new(url)
def reconnect(self, timeout=0.1):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
time.sleep(timeout)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
super(Chrome, self).start_session(capabilities, browser_profile)
def quit(self):
logger.debug("closing webdriver")
self.service.process.kill()
try:
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.event.set()
except Exception: # noqa
pass
try:
logger.debug("killing browser")
os.kill(self.browser_pid)
# self.browser.terminate()
# self.browser.wait(1)
except TimeoutError as e:
logger.debug(e, exc_info=True)
except Exception: # noqa
pass
if (
hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir
):
for _ in range(5):
try:
logger.debug("removing profile : %s" % self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except PermissionError:
logger.debug(
"permission error. files are still in use/locked. retying..."
)
except (RuntimeError, OSError) as e:
logger.debug("%s retying..." % e)
else:
break
time.sleep(0.1)
def __del__(self):
try:
self.service.process.kill()
except:
pass
self.quit()
def __enter__(self):
try:
curframe = inspect.currentframe()
callframe = inspect.getouterframes(curframe, 2)
caller = callframe[1][3]
logging.getLogger(__name__).debug("__enter__ caller: %s" % caller)
if caller == "get":
return
except (AttributeError, ValueError, KeyError, OSError) as e:
logging.getLogger(__name__).debug(e)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
time.sleep(self._delay)
self.service.start()
self.start_session()
def __hash__(self):
return hash(self.options.debugger_address)
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in ("google-chrome", "chromium", "chromium-browser"):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"]
)
else:
for item in map(
os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA")
):
for subitem in (
"Google/Chrome/Application",
"Google/Chrome Beta/Application",
"Google/Chrome Canary/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)

View File

@ -0,0 +1,259 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
import io
import logging
import os
import random
import re
import string
import sys
import zipfile
from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve
from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions
TARGET_VERSION = 0
logger = logging.getLogger("uc")
class Chrome:
def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
if not kwargs.get("executable_path"):
kwargs["executable_path"] = "./{}".format(
ChromeDriverManager(*args, **kwargs).executable_path
)
if not kwargs.get("options"):
kwargs["options"] = ChromeOptions()
instance = object.__new__(_Chrome)
instance.__init__(*args, **kwargs)
instance._orig_get = instance.get
def _get_wrapped(*args, **kwargs):
if instance.execute_script("return navigator.webdriver"):
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver'
? undefined
: typeof target[key] === 'function'
? target[key].bind(target)
: target[key]
})
});
"""
},
)
return instance._orig_get(*args, **kwargs)
instance.get = _get_wrapped
instance.get = _get_wrapped
instance.get = _get_wrapped
original_user_agent_string = instance.execute_script(
"return navigator.userAgent"
)
instance.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": original_user_agent_string.replace("Headless", ""),
},
)
if emulate_touch:
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
return instance
class ChromeOptions:
def __new__(cls, *args, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
instance = object.__new__(_ChromeOptions)
instance.__init__()
instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
instance.add_argument("--disable-blink-features=AutomationControlled")
return instance
class ChromeDriverManager(object):
installed = False
selenium_patched = False
target_version = None
DL_BASE = "https://chromedriver.storage.googleapis.com/"
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
_platform = sys.platform
if TARGET_VERSION:
# use global if set
self.target_version = TARGET_VERSION
if target_version:
# use explicitly passed target
self.target_version = target_version # user override
if not self.target_version:
# none of the above (default) and just get current version
self.target_version = self.get_release_version_number().version[
0
] # only major version int
self._base = base_ = "chromedriver{}"
exe_name = self._base
if _platform in ("win32",):
exe_name = base_.format(".exe")
if _platform in ("linux",):
_platform += "64"
exe_name = exe_name.format("")
if _platform in ("darwin",):
_platform = "mac64"
exe_name = exe_name.format("")
self.platform = _platform
self.executable_path = executable_path or exe_name
self._exe_name = exe_name
def patch_selenium_webdriver(self_):
"""
Patches selenium package Chrome, ChromeOptions classes for current session
:return:
"""
import selenium.webdriver.chrome.service
import selenium.webdriver
selenium.webdriver.Chrome = Chrome
selenium.webdriver.ChromeOptions = ChromeOptions
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
self_.__class__.selenium_patched = True
def install(self, patch_selenium=True):
"""
Initialize the patch
This will:
download chromedriver if not present
patch the downloaded chromedriver
patch selenium package if <patch_selenium> is True (default)
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
:return:
"""
if not os.path.exists(self.executable_path):
self.fetch_chromedriver()
if not self.__class__.installed:
if self.patch_binary():
self.__class__.installed = True
if patch_selenium:
self.patch_selenium_webdriver()
def get_release_version_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
"""
path = (
"LATEST_RELEASE"
if not self.target_version
else f"LATEST_RELEASE_{self.target_version}"
)
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
def fetch_chromedriver(self):
"""
Downloads ChromeDriver from source and unpacks the executable
:return: on success, name of the unpacked executable
"""
base_ = self._base
zip_name = base_.format(".zip")
ver = self.get_release_version_number().vstring
if os.path.exists(self.executable_path):
return self.executable_path
urlretrieve(
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
filename=zip_name,
)
with zipfile.ZipFile(zip_name) as zf:
zf.extract(self._exe_name)
os.remove(zip_name)
if sys.platform != "win32":
os.chmod(self._exe_name, 0o755)
return self._exe_name
@staticmethod
def random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
return "".join(cdc).encode()
def patch_binary(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
linect = 0
replacement = self.random_cdc()
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
def install(executable_path=None, target_version=None, *args, **kwargs):
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()

View File

@ -35,14 +35,16 @@ class PageElement(CDPObject):
class CDP:
log = logging.getLogger("CDP")
endpoints = CDPObject({
"json": "/json",
"protocol": "/json/protocol",
"list": "/json/list",
"new": "/json/new?{url}",
"activate": "/json/activate/{id}",
"close": "/json/close/{id}",
})
endpoints = CDPObject(
{
"json": "/json",
"protocol": "/json/protocol",
"list": "/json/list",
"new": "/json/new?{url}",
"activate": "/json/activate/{id}",
"close": "/json/close/{id}",
}
)
def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
@ -58,7 +60,7 @@ class CDP:
def tab_activate(self, id=None):
if not id:
active_tab = self.tab_list()[0]
active_tab = self.tab_list()[0]
id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id))

View File

@ -0,0 +1,191 @@
import asyncio
import logging
import time
import traceback
from collections.abc import Mapping
from collections.abc import Sequence
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import List
from typing import Optional
from contextlib import ExitStack
import threading
from functools import wraps, partial
class Structure(dict):
"""
This is a dict-like object structure, which you should subclass
Only properties defined in the class context are used on initialization.
See example
"""
_store = {}
def __init__(self, *a, **kw):
"""
Instantiate a new instance.
:param a:
:param kw:
"""
super().__init__()
# auxiliar dict
d = dict(*a, **kw)
for k, v in d.items():
if isinstance(v, Mapping):
self[k] = self.__class__(v)
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
self[k] = [self.__class__(i) for i in v]
else:
self[k] = v
super().__setattr__("__dict__", self)
def __getattr__(self, item):
return getattr(super(), item)
def __getitem__(self, item):
return super().__getitem__(item)
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
super().__setitem__(key, value)
def update(self, *a, **kw):
super().update(*a, **kw)
def __eq__(self, other):
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
return hash(frozenset(self.items()))
@classmethod
def __init_subclass__(cls, **kwargs):
cls._store = {}
def _normalize_strings(self):
for k, v in self.copy().items():
if isinstance(v, (str)):
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
def function_reached_timeout():
if on_timeout:
on_timeout(func)
else:
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t.start()
try:
return func(*args, **kwargs)
except:
t.cancel()
raise
finally:
t.cancel()
return wrapped
return wrapper
def test():
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
import undetected_chromedriver as uc
import threading
def collector(
driver: uc.Chrome,
stop_event: threading.Event,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
listen_events: Sequence = ("browser", "network", "performance"),
):
def threaded(driver, stop_event, on_event_coro):
async def _ensure_service_started():
while (
getattr(driver, "service", False)
and getattr(driver.service, "process", False)
and driver.service.process.poll()
):
print("waiting for driver service to come back on")
await asyncio.sleep(0.05)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
await _ensure_service_started()
return driver.get_log(typ)
async def looper():
while not stop_event.is_set():
log_lines = []
try:
for _ in listen_events:
try:
log_lines += await get_log_lines(_)
except:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
continue
if log_lines and on_event_coro:
await on_event_coro(log_lines)
except Exception as e:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t.start()
async def on_event(data):
print("on_event")
print("data:", data)
def func_called(fn):
def wrapped(*args, **kwargs):
print(
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
)
while driver.service.process and driver.service.process.poll() is not None:
time.sleep(0.1)
res = fn(*args, **kwargs)
print("func completed! (result: %s)" % res)
return res
return wrapped
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request)
collector_stop = threading.Event()
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
time.sleep(10)
driver.quit()

View File

@ -27,8 +27,12 @@ def start_detached(executable, *args):
reader, writer = multiprocessing.Pipe(False)
# do not keep reference
multiprocessing.Process(target=_start_detached, args=(executable, *args), kwargs={'writer': writer},
daemon=True).start()
multiprocessing.Process(
target=_start_detached,
args=(executable, *args),
kwargs={"writer": writer},
daemon=True,
).start()
# receive pid from pipe
pid = reader.recv()
REGISTERED.append(pid)
@ -43,7 +47,7 @@ def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == 'Windows':
if platform.system() == "Windows":
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
elif sys.version_info < (3, 2):
# assume posix
@ -62,11 +66,10 @@ def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
def _cleanup():
for pid in REGISTERED:
try:
logging.getLogger(__name__).debug('cleaning up pid %d ' % pid)
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
os.kill(pid, signal.SIGTERM)
except: # noqa
pass
atexit.register(_cleanup)

View File

@ -1,35 +1,15 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import base64
import os
from selenium.webdriver.chrome.options import Options as _ChromeOptions
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromeOptions):
KEY = "goog:chromeOptions"
class ChromeOptions(_ChromiumOptions):
_session = None
emulate_touch = True
mock_permissions = True
mock_chrome_global = False
mock_canvas_fp = True
_user_data_dir = None
def __init__(self):
super().__init__()
self._arguments = []
self._binary_location = ""
self._extension_files = []
self._extensions = []
self._experimental_options = {}
self._debugger_address = None
self._caps = self.default_capabilities
self.mobile_options = None
self.set_capability("pageLoadStrategy", "normal")
@property
def user_data_dir(self):
return self._user_data_dir
@ -49,207 +29,6 @@ class ChromeOptions(_ChromeOptions):
apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath)
@property
def arguments(self):
"""
:Returns: A list of arguments needed for the browser
"""
return self._arguments
@property
def binary_location(self) -> str:
"""
:Returns: The location of the binary, otherwise an empty string
"""
return self._binary_location
@binary_location.setter
def binary_location(self, value: str):
"""
Allows you to set where the chromium binary lives
:Args:
- value: path to the Chromium binary
"""
self._binary_location = value
@property
def debugger_address(self) -> str:
"""
:Returns: The address of the remote devtools instance
"""
return self._debugger_address
@debugger_address.setter
def debugger_address(self, value: str):
"""
Allows you to set the address of the remote devtools instance
that the ChromeDriver instance will try to connect to during an
active wait.
:Args:
- value: address of remote devtools instance if any (hostname[:port])
"""
self._debugger_address = value
@property
def extensions(self):
"""
:Returns: A list of encoded extensions that will be loaded
"""
encoded_extensions = []
for ext in self._extension_files:
file_ = open(ext, "rb")
# Should not use base64.encodestring() which inserts newlines every
# 76 characters (per RFC 1521). Chromedriver has to remove those
# unnecessary newlines before decoding, causing performance hit.
encoded_extensions.append(base64.b64encode(file_.read()).decode("UTF-8"))
file_.close()
return encoded_extensions + self._extensions
def add_extension(self, extension: str):
"""
Adds the path to the extension to a list that will be used to extract it
to the ChromeDriver
:Args:
- extension: path to the \\*.crx file
"""
if extension:
extension_to_add = os.path.abspath(os.path.expanduser(extension))
if os.path.exists(extension_to_add):
self._extension_files.append(extension_to_add)
else:
raise IOError("Path to the extension doesn't exist")
else:
raise ValueError("argument can not be null")
def add_encoded_extension(self, extension: str):
"""
Adds Base64 encoded string with extension data to a list that will be used to extract it
to the ChromeDriver
:Args:
- extension: Base64 encoded string with extension data
"""
if extension:
self._extensions.append(extension)
else:
raise ValueError("argument can not be null")
@property
def experimental_options(self) -> dict:
"""
:Returns: A dictionary of experimental options for chromium
"""
return self._experimental_options
def add_experimental_option(self, name: str, value: dict):
"""
Adds an experimental option which is passed to chromium.
:Args:
name: The experimental option name.
value: The option value.
"""
self._experimental_options[name] = value
@property
def headless(self) -> bool:
"""
:Returns: True if the headless argument is set, else False
"""
return "--headless" in self._arguments
@headless.setter
def headless(self, value: bool):
"""
Sets the headless argument
:Args:
value: boolean value indicating to set the headless option
"""
args = {"--headless"}
if value is True:
self._arguments.extend(args)
else:
self._arguments = list(set(self._arguments) - args)
@property
def page_load_strategy(self) -> str:
return self._caps["pageLoadStrategy"]
@page_load_strategy.setter
def page_load_strategy(self, strategy: str):
if strategy in ["normal", "eager", "none"]:
self.set_capability("pageLoadStrategy", strategy)
else:
raise ValueError(
"Strategy can only be one of the following: normal, eager, none"
)
@property
def capabilities(self):
return self._caps
def set_capability(self, name, value):
""" Sets a capability """
self._caps[name] = value
def to_capabilities(self) -> dict:
"""
Creates a capabilities with all the options that have been set
:Returns: A dictionary with everything
"""
caps = self._caps
chrome_options = self.experimental_options.copy()
if self.mobile_options:
chrome_options.update(self.mobile_options)
chrome_options["extensions"] = self.extensions
if self.binary_location:
chrome_options["binary"] = self.binary_location
chrome_options["args"] = self._arguments
if self.debugger_address:
chrome_options["debuggerAddress"] = self.debugger_address
caps[self.KEY] = chrome_options
return caps
def ignore_local_proxy_environment_variables(self):
"""
By calling this you will ignore HTTP_PROXY and HTTPS_PROXY from being picked up and used.
"""
self._ignore_local_proxy = True
@property
def default_capabilities(self) -> dict:
return DesiredCapabilities.CHROME.copy()
def enable_mobile(
self,
android_package: str = None,
android_activity: str = None,
device_serial: str = None,
):
"""
Enables mobile browser use for browsers that support it
:Args:
android_activity: The name of the android package to start
"""
if not android_package:
raise AttributeError("android_package must be passed in")
self.mobile_options = {"androidPackage": android_package}
if android_activity:
self.mobile_options["androidActivity"] = android_activity
if device_serial:
self.mobile_options["androidDeviceSerial"] = device_serial
def add_argument(self, argument):
"""
Adds an argument to the list
:Args:
- Sets the arguments
"""
if argument:
self._arguments.append(argument)
else:
raise ValueError("argument can not be null")
@classmethod
def from_options(cls, options):
o = cls()

View File

@ -50,11 +50,24 @@ class Reactor(threading.Thread):
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
self.driver.service
and self.driver.service.process
and self.driver.process.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await asyncio.sleep(0)
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:

View File

@ -1,319 +0,0 @@
(function (name, context, definition) {
if (typeof module !== 'undefined' && module.exports) {
module.exports = definition();
} else if (typeof define === 'function' && define.amd) {
define(definition);
} else {
context[name] = definition();
}
})('Fingerprint', this, function () {
'use strict';
var Fingerprint = function (options) {
var nativeForEach, nativeMap;
nativeForEach = Array.prototype.forEach;
nativeMap = Array.prototype.map;
this.each = function (obj, iterator, context) {
if (obj === null) {
return;
}
if (nativeForEach && obj.forEach === nativeForEach) {
obj.forEach(iterator, context);
} else if (obj.length === +obj.length) {
for (var i = 0, l = obj.length; i < l; i++) {
if (iterator.call(context, obj[i], i, obj) === {}) return;
}
} else {
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
if (iterator.call(context, obj[key], key, obj) === {}) return;
}
}
}
};
this.map = function (obj, iterator, context) {
var results = [];
// Not using strict equality so that this acts as a
// shortcut to checking for `null` and `undefined`.
if (obj == null) return results;
if (nativeMap && obj.map === nativeMap) return obj.map(iterator, context);
this.each(obj, function (value, index, list) {
results[results.length] = iterator.call(context, value, index, list);
});
return results;
};
if (typeof options == 'object') {
this.hasher = options.hasher;
this.screen_resolution = options.screen_resolution;
this.screen_orientation = options.screen_orientation;
this.canvas = options.canvas;
this.ie_activex = options.ie_activex;
} else if (typeof options == 'function') {
this.hasher = options;
}
};
Fingerprint.prototype = {
get: function () {
var keys = [];
keys.push(navigator.userAgent);
keys.push(navigator.language);
keys.push(screen.colorDepth);
if (this.screen_resolution) {
var resolution = this.getScreenResolution();
if (typeof resolution !== 'undefined') { // headless browsers, such as phantomjs
keys.push(resolution.join('x'));
}
}
keys.push(new Date().getTimezoneOffset());
keys.push(this.hasSessionStorage());
keys.push(this.hasLocalStorage());
keys.push(this.hasIndexDb());
//body might not be defined at this point or removed programmatically
if (document.body) {
keys.push(typeof (document.body.addBehavior));
} else {
keys.push(typeof undefined);
}
keys.push(typeof (window.openDatabase));
keys.push(navigator.cpuClass);
keys.push(navigator.platform);
keys.push(navigator.doNotTrack);
keys.push(this.getPluginsString());
if (this.canvas && this.isCanvasSupported()) {
keys.push(this.getCanvasFingerprint());
}
if (this.hasher) {
return this.hasher(keys.join('###'), 31);
} else {
return this.murmurhash3_32_gc(keys.join('###'), 31);
}
},
/**
* JS Implementation of MurmurHash3 (r136) (as of May 20, 2011)
*
* @author Gary Court
* @see http://github.com/garycourt/murmurhash-js
* @author Austin Appleby
* @see http://sites.google.com/site/murmurhash/
*
* @param {string} key ASCII only
* @param {number} seed Positive integer only
* @return {number} 32-bit positive integer hash
*/
murmurhash3_32_gc: function (key, seed) {
var remainder, bytes, h1, h1b, c1, c2, k1, i;
remainder = key.length & 3; // key.length % 4
bytes = key.length - remainder;
h1 = seed;
c1 = 0xcc9e2d51;
c2 = 0x1b873593;
i = 0;
while (i < bytes) {
k1 =
((key.charCodeAt(i) & 0xff)) |
((key.charCodeAt(++i) & 0xff) << 8) |
((key.charCodeAt(++i) & 0xff) << 16) |
((key.charCodeAt(++i) & 0xff) << 24);
++i;
k1 = ((((k1 & 0xffff) * c1) + ((((k1 >>> 16) * c1) & 0xffff) << 16))) & 0xffffffff;
k1 = (k1 << 15) | (k1 >>> 17);
k1 = ((((k1 & 0xffff) * c2) + ((((k1 >>> 16) * c2) & 0xffff) << 16))) & 0xffffffff;
h1 ^= k1;
h1 = (h1 << 13) | (h1 >>> 19);
h1b = ((((h1 & 0xffff) * 5) + ((((h1 >>> 16) * 5) & 0xffff) << 16))) & 0xffffffff;
h1 = (((h1b & 0xffff) + 0x6b64) + ((((h1b >>> 16) + 0xe654) & 0xffff) << 16));
}
k1 = 0;
switch (remainder) {
case 3:
k1 ^= (key.charCodeAt(i + 2) & 0xff) << 16;
case 2:
k1 ^= (key.charCodeAt(i + 1) & 0xff) << 8;
case 1:
k1 ^= (key.charCodeAt(i) & 0xff);
k1 = (((k1 & 0xffff) * c1) + ((((k1 >>> 16) * c1) & 0xffff) << 16)) & 0xffffffff;
k1 = (k1 << 15) | (k1 >>> 17);
k1 = (((k1 & 0xffff) * c2) + ((((k1 >>> 16) * c2) & 0xffff) << 16)) & 0xffffffff;
h1 ^= k1;
}
h1 ^= key.length;
h1 ^= h1 >>> 16;
h1 = (((h1 & 0xffff) * 0x85ebca6b) + ((((h1 >>> 16) * 0x85ebca6b) & 0xffff) << 16)) & 0xffffffff;
h1 ^= h1 >>> 13;
h1 = ((((h1 & 0xffff) * 0xc2b2ae35) + ((((h1 >>> 16) * 0xc2b2ae35) & 0xffff) << 16))) & 0xffffffff;
h1 ^= h1 >>> 16;
return h1 >>> 0;
},
// https://bugzilla.mozilla.org/show_bug.cgi?id=781447
hasLocalStorage: function () {
try {
return !!window.localStorage;
} catch (e) {
return true; // SecurityError when referencing it means it exists
}
},
hasSessionStorage: function () {
try {
return !!window.sessionStorage;
} catch (e) {
return true; // SecurityError when referencing it means it exists
}
},
hasIndexDb: function () {
try {
return !!window.indexedDB;
} catch (e) {
return true; // SecurityError when referencing it means it exists
}
},
isCanvasSupported: function () {
var elem = document.createElement('canvas');
return !!(elem.getContext && elem.getContext('2d'));
},
isIE: function () {
if (navigator.appName === 'Microsoft Internet Explorer') {
return true;
} else if (navigator.appName === 'Netscape' && /Trident/.test(navigator.userAgent)) {// IE 11
return true;
}
return false;
},
getPluginsString: function () {
if (this.isIE() && this.ie_activex) {
return this.getIEPluginsString();
} else {
return this.getRegularPluginsString();
}
},
getRegularPluginsString: function () {
return this.map(navigator.plugins, function (p) {
var mimeTypes = this.map(p, function (mt) {
return [mt.type, mt.suffixes].join('~');
}).join(',');
return [p.name, p.description, mimeTypes].join('::');
}, this).join(';');
},
getIEPluginsString: function () {
if (window.ActiveXObject) {
var names = ['ShockwaveFlash.ShockwaveFlash',//flash plugin
'AcroPDF.PDF', // Adobe PDF reader 7+
'PDF.PdfCtrl', // Adobe PDF reader 6 and earlier, brrr
'QuickTime.QuickTime', // QuickTime
// 5 versions of real players
'rmocx.RealPlayer G2 Control',
'rmocx.RealPlayer G2 Control.1',
'RealPlayer.RealPlayer(tm) ActiveX Control (32-bit)',
'RealVideo.RealVideo(tm) ActiveX Control (32-bit)',
'RealPlayer',
'SWCtl.SWCtl', // ShockWave player
'WMPlayer.OCX', // Windows media player
'AgControl.AgControl', // Silverlight
'Skype.Detection'];
// starting to detect plugins in IE
return this.map(names, function (name) {
try {
new ActiveXObject(name);
return name;
} catch (e) {
return null;
}
}).join(';');
} else {
return ""; // behavior prior version 0.5.0, not breaking backwards compat.
}
},
getScreenResolution: function () {
var resolution;
if (this.screen_orientation) {
resolution = (screen.height > screen.width) ? [screen.height, screen.width] : [screen.width, screen.height];
} else {
resolution = [screen.height, screen.width];
}
return resolution;
},
getCanvasFingerprint: function () {
var canvas = document.createElement('canvas');
var ctx = canvas.getContext('2d');
// https://www.browserleaks.com/canvas#how-does-it-work
var txt = 'http://valve.github.io';
ctx.textBaseline = "top";
ctx.font = "14px 'Arial'";
ctx.textBaseline = "alphabetic";
ctx.fillStyle = "#f60";
ctx.fillRect(125, 1, 62, 20);
ctx.fillStyle = "#069";
ctx.fillText(txt, 2, 15);
ctx.fillStyle = "rgba(102, 204, 0, 0.7)";
ctx.fillText(txt, 4, 17);
return canvas.toDataURL();
}
};
return Fingerprint;
});
new Fingerprint({canvas: true}).get();
var inject = function () {
var overwrite = function (name) {
const OLD = HTMLCanvasElement.prototype[name];
Object.defineProperty(HTMLCanvasElement.prototype, name, {
"value": function () {
var shift = {
'r': Math.floor(Math.random() * 10) - 5,
'g': Math.floor(Math.random() * 10) - 5,
'b': Math.floor(Math.random() * 10) - 5,
'a': Math.floor(Math.random() * 10) - 5
};
var width = this.width, height = this.height, context = this.getContext("2d");
var imageData = context.getImageData(0, 0, width, height);
for (var i = 0; i < height; i++) {
for (var j = 0; j < width; j++) {
var n = ((i * (width * 4)) + (j * 4));
imageData.data[n + 0] = imageData.data[n + 0] + shift.r;
imageData.data[n + 1] = imageData.data[n + 1] + shift.g;
imageData.data[n + 2] = imageData.data[n + 2] + shift.b;
imageData.data[n + 3] = imageData.data[n + 3] + shift.a;
}
}
context.putImageData(imageData, 0, 0);
return OLD.apply(this, arguments);
}
});
};
overwrite('toBlob');
overwrite('toDataURL');
};
inject();
new Fingerprint({canvas: true}).get();

View File

@ -1,63 +0,0 @@
import logging
import sys
import time # noqa
logging.basicConfig(level=10)
logger = logging.getLogger("TEST")
logger.setLevel(20)
JS_SERIALIZE_FUNCTION = """
decycle=function(n,e){"use strict";var t=new WeakMap;return function n(o,r){var c,i;return void 0!==e&&(o=e(o)),"object"!=typeof o||null===o||o instanceof Boolean||o instanceof Date||o instanceof Number||o instanceof RegExp||o instanceof String?o:void 0!==(c=t.get(o))?{$ref:c}:(t.set(o,r),Array.isArray(o)?(i=[],o.forEach(function(e,t){i[t]=n(e,r+"["+t+"]")})):(i={},Object.keys(o).forEach(function(e){i[e]=n(o[e],r+"["+JSON.stringify(e)+"]")})),i)}(n,"$")};
function replacer(t){try{if(Array.prototype.splice.call(t).length<100){let e={};for(let r in t)e[r]=t[r];return e}}catch(t){}}
return decycle(window)
"""
def test_quick():
import undetected_chromedriver.v2 as uc
print("uc module: ", uc)
# options = selenium.webdriver.ChromeOptions()
options = uc.ChromeOptions()
options.add_argument("--user-data-dir=c:\\temp")
options.binary_location = uc.find_chrome_executable()
driver = uc.Chrome(
executable_path="./chromedriver.exe",
options=options,
service_log_path="c:\\temp\\service.log.txt",
)
while True:
sys.stdin.read()
def test_undetected_chromedriver():
import undetected_chromedriver.v2 as uc
driver = uc.Chrome()
with driver:
driver.get("https://nowsecure.nl")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("nowsecure.nl.png")
with driver:
driver.get("https://cia.gov")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("cia.gov.png")
with driver:
driver.get("https://lhcdn.botprotect.io")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("notprotect.io.png")
with driver:
driver.get("https://www.datadome.co")
time.sleep(4) # sleep only used for timing of screenshot
driver.save_screenshot("datadome.co.png")
# test_quick()
# #test_undetected_chromedriver()

View File

@ -1,67 +0,0 @@
import asyncio
import logging
import cv2
import undetected_chromedriver.v2 as uc
logging.basicConfig(level=10)
just_some_urls = [
"https://bing.com",
"http://www.google.com",
"https://codepen.io",
"https://",
]
class ChromeDriverCV2Streamer:
def __init__(self, driver):
super().__init__()
self.driver = driver
self.display = None
self.event = asyncio.Event()
self.daemon = True
def stop(self):
self.event.set()
def start(self):
asyncio.ensure_future(self._start_capture_loop())
async def _start_capture_loop(self):
executor = None
self.display = cv2.namedWindow("display")
while not self.event.is_set():
await asyncio.sleep(0.25)
try:
success = await loop.run_in_executor(
executor, self.driver.save_screenshot, "capture.tmp.png"
)
logging.getLogger().debug("got screenshot? %s", success)
frame = await loop.run_in_executor(
executor, cv2.imread, "capture.tmp.png"
)
logging.getLogger().debug("frame: %s", frame)
await loop.run_in_executor(executor, cv2.imshow, "display", frame)
await loop.run_in_executor(executor, cv2.waitKey, 1)
logging.getLogger().debug("waited key success")
except Exception as e:
print(e)
async def main():
opts = uc.ChromeOptions()
opts.headless = True
driver = uc.Chrome(options=opts)
streamer = ChromeDriverCV2Streamer(driver)
streamer.start()
for url in just_some_urls:
# with driver:
driver.get("https://nu.nl")
await asyncio.sleep(3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@ -1,63 +0,0 @@
# coding: utf-8
import logging
import os
import sys
import undetected_chromedriver.v2 as uc
# it's not required to enable logging for cdp events to work
# but as this is a test, it's good too it all
logging.basicConfig(level=10)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARN)
driver = uc.Chrome(enable_cdp_events=True)
# set the callback to Network.dataReceived to print (yeah not much original)
driver.add_cdp_listener("Network.dataReceived", print)
# example of executing regular cdp commands
driver.execute_cdp_cmd("Network.getAllCookies", {})
# okay another one
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{"source": """ alert('another new document')"""},
)
# set the callback for ALL events (this may slow down execution)
# driver.add_cdp_listener('*', print)
with driver:
driver.get("https://nowsecure.nl")
driver.save_screenshot("nowsecure.nl.headfull.png")
try:
os.system("nowsecure.nl.headfull.png")
except:
pass
driver.quit()
opts = uc.ChromeOptions()
opts.headless = True
driver = uc.Chrome(enable_cdp_events=True, options=opts)
# okay another one
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{"source": """ alert('another new document')"""},
)
driver.add_cdp_listener("*", print)
with driver:
driver.get("https://nowsecure.nl")
driver.save_screenshot("nowsecure.nl.headfull.png")
try:
os.system("nowsecure.nl.headfull.png")
except:
pass
while True:
sys.stdin.read()

View File

@ -1,6 +1,6 @@
import pytest
from _pytest.fixtures import FixtureRequest
import undetected_chromedriver.v2 as uc
import undetected_chromedriver as uc
FAILED_SCREENSHOT_NAME = "failed.png"

View File

@ -1,702 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import shutil
import sys
import tempfile
import time
import inspect
import requests
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
import selenium.webdriver.common.service
import selenium.webdriver.remote.webdriver
import websockets
from .cdp import CDP
from .options import ChromeOptions
from .patcher import IS_POSIX
from .patcher import Patcher
from .reactor import Reactor
__all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
from .dprocess import start_detached
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
_instances = set()
session_id = None
def __init__(
self,
executable_path=None,
port=0,
options=None,
enable_cdp_events=False,
service_args=None,
desired_capabilities=None,
service_log_path=None,
keep_alive=False,
log_level=0,
headless=False,
delay=5,
version_main=None,
patcher_force_close=False,
):
"""
Creates a new instance of the chrome driver.
Starts the service and then creates new instance of chrome driver.
Parameters
----------
executable_path: str, optional, default: None - use find_chrome_executable
Path to the executable. If the default is used it assumes the executable is in the $PATH
port: int, optional, default: 0
port you would like the service to run, if left as 0, a free port will be found.
options: ChromeOptions, optional, default: None - automatic useful defaults
this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
enable_cdp_events: bool, default: False
:: currently for chrome only
this enables the handling of wire messages
when enabled, you can subscribe to CDP events by using:
driver.add_cdp_listener("Network.dataReceived", yourcallback)
# yourcallback is an callable which accepts exactly 1 dict as parameter
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
delay: int, optional, default: 5
delay in seconds to wait before giving back control.
this is used only when using the context manager
(`with` statement) to bypass, for example CloudFlare.
5 seconds is a foolproof value.
version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87
patcher_force_close: bool, optional, default: False
instructs the patcher to do whatever it can to access the chromedriver binary
if the file is locked, it will force shutdown all instances.
setting it is not recommended, unless you know the implications and think
you might need it.
"""
patcher = Patcher(
executable_path=executable_path,
force=patcher_force_close,
version_main=version_main,
)
patcher.auto()
if not options:
options = ChromeOptions()
try:
if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options._session = self
debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1"
if not options.debugger_address:
options.debugger_address = "%s:%d" % (debug_host, debug_port)
if enable_cdp_events:
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
user_data_dir, language, keep_user_data_dir = None, None, None
# see if a custom user profile is specified
for arg in options.arguments:
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
language = m[1]
except IndexError:
logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9"
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
if not user_data_dir:
if options.user_data_dir:
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True
logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not language:
try:
import locale
language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception:
pass
if not language:
language = "en-US"
options.add_argument("--lang=%s" % language)
if not options.binary_location:
options.binary_location = find_chrome_executable()
self._delay = delay
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if headless or options.headless:
options.headless = True
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
json.dump(config, fs)
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
self.browser_pid = start_detached(options.binary_location, *options.arguments)
# self.browser = subprocess.Popen(
# [options.binary_location, *options.arguments],
# stdin=subprocess.PIPE,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# close_fds=IS_POSIX,
# )
super(Chrome, self).__init__(
executable_path=patcher.executable_path,
port=port,
options=options,
service_args=service_args,
desired_capabilities=desired_capabilities,
service_log_path=service_log_path,
keep_alive=keep_alive,
)
# intentional
# self.webdriver = selenium.webdriver.chrome.webdriver.WebDriver(
# executable_path=patcher.executable_path,
# port=port,
# options=options,
# service_args=service_args,
# desired_capabilities=desired_capabilities,
# service_log_path=service_log_path,
# keep_alive=keep_alive,
# )
self.reactor = None
if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger(
"selenium.webdriver.remote.remote_connection"
).setLevel(20)
reactor = Reactor(self)
reactor.start()
self.reactor = reactor
if options.headless:
self._configure_headless()
orig_get = self.get
# def get_wrapped(*args, **kwargs):
# self.execute_cdp_cmd(
# "Network.setExtraHTTPHeaders",
# {"headers": {"dnt": "1", "cache-control": "no-cache"}},
# )
#
# return orig_get(*args, **kwargs)
#
# self.get = get_wrapped
def _configure_headless(self):
orig_get = self.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
logger.info("patch navigator.webdriver")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver' ?
undefined :
typeof target[key] === 'function' ?
target[key].bind(target) :
target[key]
})
});
"""
},
)
logger.info("patch user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
if self.options.mock_permissions:
logger.info("patch permissions api")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
// fix Notification permission in headless mode
Object.defineProperty(Notification, 'permission', { get: () => "default"});
"""
},
)
if self.options.emulate_touch:
logger.info("patch emulate touch")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
if self.options.mock_canvas_fp:
logger.info("patch HTMLCanvasElement fingerprinting")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
(function() {
const ORIGINAL_CANVAS = HTMLCanvasElement.prototype[name];
Object.defineProperty(HTMLCanvasElement.prototype, name, {
"value": function() {
var shift = {
'r': Math.floor(Math.random() * 10) - 5,
'g': Math.floor(Math.random() * 10) - 5,
'b': Math.floor(Math.random() * 10) - 5,
'a': Math.floor(Math.random() * 10) - 5
};
var width = this.width,
height = this.height,
context = this.getContext("2d");
var imageData = context.getImageData(0, 0, width, height);
for (var i = 0; i < height; i++) {
for (var j = 0; j < width; j++) {
var n = ((i * (width * 4)) + (j * 4));
imageData.data[n + 0] = imageData.data[n + 0] + shift.r;
imageData.data[n + 1] = imageData.data[n + 1] + shift.g;
imageData.data[n + 2] = imageData.data[n + 2] + shift.b;
imageData.data[n + 3] = imageData.data[n + 3] + shift.a;
}
}
context.putImageData(imageData, 0, 0);
return ORIGINAL_CANVAS.apply(this, arguments);
}
});
})(this)
"""
},
)
if self.options.mock_chrome_global:
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'chrome', {
value: new Proxy(window.chrome, {
has: (target, key) => true,
get: (target, key) => {
return {
app: {
isInstalled: false,
},
webstore: {
onInstallStageChanged: {},
onDownloadProgress: {},
},
runtime: {
PlatformOs: {
MAC: 'mac',
WIN: 'win',
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
OPENBSD: 'openbsd',
},
PlatformArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
PlatformNaclArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
RequestUpdateCheckStatus: {
THROTTLED: 'throttled',
NO_UPDATE: 'no_update',
UPDATE_AVAILABLE: 'update_available',
},
OnInstalledReason: {
INSTALL: 'install',
UPDATE: 'update',
CHROME_UPDATE: 'chrome_update',
SHARED_MODULE_UPDATE: 'shared_module_update',
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic',
},
},
}
}
})
});
"""
},
)
return orig_get(*args, **kwargs)
self.get = get_wrapped
def __dir__(self):
return object.__dir__(self)
def get(self, url):
tabs = requests.get('http://{0}:{1}/json'.format(*self.options.debugger_address.split(':'))).json()
for tab in tabs:
if tab['type'] == 'page':
break
async def _get():
wsurl = tab['webSocketDebuggerUrl']
async with websockets.connect(wsurl) as ws:
await ws.send(json.dumps({"method": "Page.navigate", "params": {"url": url}, "id": 1}))
return await ws.recv()
with self:
return asyncio.get_event_loop().run_until_complete(_get())
def add_cdp_listener(self, event_name, callback):
if (
self.reactor
and self.reactor is not None
and isinstance(self.reactor, Reactor)
):
self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers
return False
def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear()
def tab_new(self, url: str):
"""
this opens a url in a new tab.
apparently, that passes all tests directly!
Parameters
----------
url
Returns
-------
"""
if not hasattr(self, "cdp"):
from .cdp import CDP
self.cdp = CDP(self.options)
self.cdp.tab_new(url)
def reconnect(self, timeout=0.1):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
time.sleep(timeout)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
super(Chrome, self).start_session(capabilities, browser_profile)
def quit(self):
logger.debug("closing webdriver")
self.service.process.kill()
try:
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.event.set()
except Exception: # noqa
pass
try:
logger.debug("killing browser")
os.kill(self.browser_pid)
# self.browser.terminate()
# self.browser.wait(1)
except TimeoutError as e:
logger.debug(e, exc_info=True)
except Exception: # noqa
pass
if (
hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir
):
for _ in range(5):
try:
logger.debug("removing profile : %s" % self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except PermissionError:
logger.debug(
"permission error. files are still in use/locked. retying..."
)
except (RuntimeError, OSError) as e:
logger.debug("%s retying..." % e)
else:
break
time.sleep(0.1)
def __del__(self):
try:
self.service.process.kill()
except:
pass
self.quit()
def __enter__(self):
try:
curframe = inspect.currentframe()
callframe = inspect.getouterframes(curframe, 2)
caller = callframe[1][3]
logging.getLogger(__name__).debug('__enter__ caller: %s' % caller)
if caller == 'get':
return
except (AttributeError, ValueError, KeyError, OSError) as e:
logging.getLogger(__name__).debug(e)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
time.sleep(self._delay)
self.service.start()
self.start_session()
def __hash__(self):
return hash(self.options.debugger_address)
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in ("google-chrome", "chromium", "chromium-browser"):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"]
)
else:
for item in map(
os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA")
):
for subitem in (
"Google/Chrome/Application",
"Google/Chrome Beta/Application",
"Google/Chrome Canary/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)