- fix unlinking driver at exit

- speedup exit process
 - fix creation of driver in multithreaded scenario
 - experimental_option now supports "nested" string  (eg: example: options.add_experimental_option("prefs": {"profile.default_content_setting_values.images": 2 })   )

Author:    sebdelsol <seb.morin@gmail.com>
Author:    UltrafunkAmsterdam
This commit is contained in:
sebdelsol 2022-03-16 20:32:26 +01:00 committed by UltrafunkAmsterdam
parent fdd8e3c705
commit 4879698118
5 changed files with 80 additions and 31 deletions

View File

@ -29,9 +29,10 @@ with codecs.open(
except Exception: except Exception:
raise RuntimeError("unable to determine version") raise RuntimeError("unable to determine version")
description = ('Selenium.webdriver.Chrome replacement with compatiblity for Brave, and other Chromium based browsers.', description = (
'Not triggered by CloudFlare/Imperva/hCaptcha and such.', "Selenium.webdriver.Chrome replacement with compatiblity for Brave, and other Chromium based browsers.",
'NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms.' "Not triggered by CloudFlare/Imperva/hCaptcha and such.",
"NOTE: results may vary due to many factors. No guarantees are given, except for ongoing efforts in understanding detection algorithms.",
) )
setup( setup(
@ -48,9 +49,7 @@ setup(
author="UltrafunkAmsterdam", author="UltrafunkAmsterdam",
author_email="info@blackhat-security.nl", author_email="info@blackhat-security.nl",
description=description, description=description,
long_description=open(os.path.join(dirname, "README.md"), encoding="utf-8").read(), long_description=open(os.path.join(dirname, "README.md"), encoding="utf-8").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
classifiers=[ classifiers=[
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",

View File

@ -19,7 +19,7 @@ by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
""" """
__version__ = "3.1.5r2" __version__ = "3.1.5r3"
import json import json
@ -414,6 +414,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
if advanced_elements: if advanced_elements:
from .webelement import WebElement from .webelement import WebElement
self._web_element_cls = WebElement self._web_element_cls = WebElement
if options.headless: if options.headless:
@ -555,6 +556,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
""" """
if not hasattr(self, "cdp"): if not hasattr(self, "cdp"):
from .cdp import CDP from .cdp import CDP
cdp = CDP(self.options) cdp = CDP(self.options)
cdp.tab_new(url) cdp.tab_new(url)
@ -628,7 +630,8 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
def __del__(self): def __del__(self):
try: try:
self.service.process.kill() super().quit()
# self.service.process.kill()
except: # noqa except: # noqa
pass pass
self.quit() self.quit()
@ -671,7 +674,7 @@ def find_chrome_executable():
candidates.update( candidates.update(
[ [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium" "/Applications/Chromium.app/Contents/MacOS/Chromium",
] ]
) )
else: else:
@ -682,7 +685,6 @@ def find_chrome_executable():
"Google/Chrome/Application", "Google/Chrome/Application",
"Google/Chrome Beta/Application", "Google/Chrome Beta/Application",
"Google/Chrome Canary/Application", "Google/Chrome Canary/Application",
): ):
candidates.add(os.sep.join((item, subitem, "chrome.exe"))) candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates: for candidate in candidates:

View File

@ -2,7 +2,9 @@
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import json
import os import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
@ -29,6 +31,38 @@ class ChromeOptions(_ChromiumOptions):
apath = os.path.abspath(path) apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath) self._user_data_dir = os.path.normpath(apath)
@staticmethod
def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict"""
if "." in key:
key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value)
return {key: value}
def handle_prefs(self, user_data_dir):
prefs = self.experimental_options.get("prefs")
if prefs:
user_data_dir = user_data_dir or self._user_data_dir
default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)
# undot prefs dict keys
undot_prefs = {}
for key, value in prefs.items():
undot_prefs.update(self._undot_key(key, value))
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs.update(json.load(f))
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod @classmethod
def from_options(cls, options): def from_options(cls, options):
o = cls() o = cls()

View File

@ -8,6 +8,7 @@ import random
import re import re
import string import string
import sys import sys
import time
import zipfile import zipfile
from distutils.version import LooseVersion from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve from urllib.request import urlopen, urlretrieve
@ -61,6 +62,9 @@ class Patcher(object):
self.executable_path = None self.executable_path = None
prefix = secrets.token_hex(8) prefix = secrets.token_hex(8)
if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True)
if not executable_path: if not executable_path:
self.executable_path = os.path.join( self.executable_path = os.path.join(
self.data_path, "_".join([prefix, self.exe_name]) self.data_path, "_".join([prefix, self.exe_name])
@ -71,7 +75,7 @@ class Patcher(object):
if not executable_path[-4:] == ".exe": if not executable_path[-4:] == ".exe":
executable_path += ".exe" executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, self.zip_name) self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path: if not executable_path:
self.executable_path = os.path.abspath( self.executable_path = os.path.abspath(
@ -146,7 +150,7 @@ class Patcher(object):
def parse_exe_version(self): def parse_exe_version(self):
with io.open(self.executable_path, "rb") as f: with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""): for line in iter(lambda: f.readline(), b""):
match = re.search(br"platform_handle\x00content\x00([0-9.]*)", line) match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
if match: if match:
return LooseVersion(match[1].decode()) return LooseVersion(match[1].decode())
@ -173,14 +177,12 @@ class Patcher(object):
except (FileNotFoundError, OSError): except (FileNotFoundError, OSError):
pass pass
os.makedirs(os.path.dirname(self.zip_path), mode=0o755, exist_ok=True) os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
with zipfile.ZipFile(fp, mode="r") as zf: with zipfile.ZipFile(fp, mode="r") as zf:
zf.extract(self.exe_name, os.path.dirname(self.zip_path)) zf.extract(self.exe_name, self.zip_path)
os.rename( os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path)
os.path.join(self.data_path, self.exe_name),
self.executable_path
)
os.remove(fp) os.remove(fp)
os.rmdir(self.zip_path)
os.chmod(self.executable_path, 0o755) os.chmod(self.executable_path, 0o755)
return self.executable_path return self.executable_path
@ -246,15 +248,29 @@ class Patcher(object):
) )
def __del__(self): def __del__(self):
try:
if not self._custom_exe_path:
# we will not delete custom exe paths.
# but this also voids support.
# downloading and patching makes sure you never use the same $cdc values, see patch_exe()
# after all, this program has a focus on detectability...
os.unlink(self.executable_path)
# except (OSError, RuntimeError, PermissionError): if self._custom_exe_path:
# pass # if the driver binary is specified by user
except: # we assume it is important enough to not delete it
raise return
else:
timeout = 3 # stop trying after this many seconds
t = time.monotonic()
while True:
now = time.monotonic()
if now - t > timeout:
# we don't want to wait until the end of time
logger.debug(
"could not unlink %s in time (%d seconds)"
% (self.executable_path, timeout)
)
break
try:
os.unlink(self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path)
break
except (OSError, RuntimeError, PermissionError):
time.sleep(0.1)
continue
except FileNotFoundError:
break

View File

@ -14,8 +14,6 @@ class WebElement(selenium.webdriver.remote.webelement.WebElement):
""" """
@property @property
def attrs(self): def attrs(self):
if not hasattr(self, "_attrs"): if not hasattr(self, "_attrs"):