Leon 2021-07-30 01:31:15 +02:00 committed by GitHub
commit 9ad1bb3e0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 59 deletions

View File

@ -34,7 +34,7 @@ from . import options
ChromeOptionsV2 = v2.ChromeOptions ChromeOptionsV2 = v2.ChromeOptions
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
__version__ = "3.0.2" __version__ = "3.0.3"
TARGET_VERSION = 0 TARGET_VERSION = 0

View File

@ -11,7 +11,7 @@ from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
class ChromeOptions(_ChromeOptions): class ChromeOptions(_ChromeOptions):
KEY = "goog:chromeOptions" KEY = "goog:chromeOptions"
session = None _session = None
emulate_touch = True emulate_touch = True
mock_permissions = True mock_permissions = True
mock_chrome_global = False mock_chrome_global = False

View File

@ -56,27 +56,44 @@ class Patcher(object):
""" """
self.force = force self.force = force
self.executable_path = None
if not executable_path: if not executable_path:
executable_path = os.path.join(self.data_path, self.exe_name) self.executable_path = os.path.join(self.data_path, self.exe_name)
if not IS_POSIX: if not IS_POSIX:
if not executable_path[-4:] == ".exe": if executable_path:
executable_path += ".exe" if not executable_path[-4:] == ".exe":
executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, self.zip_name) self.zip_path = os.path.join(self.data_path, self.zip_name)
self.executable_path = os.path.abspath(os.path.join(".", executable_path)) if not executable_path:
self.executable_path = os.path.abspath(
os.path.join(".", self.executable_path)
)
self._custom_exe_path = False
if executable_path:
self._custom_exe_path = True
self.executable_path = executable_path
self.version_main = version_main self.version_main = version_main
self.version_full = None self.version_full = None
def auto(self, executable_path=None, force=False, version_main=None): def auto(self, executable_path=None, force=False, version_main=None):
""" """"""
"""
if executable_path: if executable_path:
self.executable_path = executable_path self.executable_path = executable_path
self._custom_exe_path = True
if self._custom_exe_path:
ispatched = self.is_binary_patched(self.executable_path)
if not ispatched:
return self.patch_exe()
else:
return
if version_main: if version_main:
self.version_main = version_main self.version_main = version_main
if force is True: if force is True:

View File

@ -18,18 +18,25 @@ import selenium.webdriver.chrome.webdriver
import selenium.webdriver.common.service import selenium.webdriver.common.service
import selenium.webdriver.remote.webdriver import selenium.webdriver.remote.webdriver
from .cdp import CDP
from .options import ChromeOptions from .options import ChromeOptions
from .patcher import IS_POSIX, Patcher from .patcher import IS_POSIX, Patcher
from .reactor import Reactor from .reactor import Reactor
from .cdp import CDP
__all__ = ("Chrome", "ChromeOptions", "Patcher", "Reactor", "CDP", "find_chrome_executable") __all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc") logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel()) logger.setLevel(logging.getLogger().getEffectiveLevel())
class Chrome(selenium.webdriver.Chrome): class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
""" """
Controls the ChromeDriver and allows you to drive the browser. Controls the ChromeDriver and allows you to drive the browser.
@ -67,6 +74,7 @@ class Chrome(selenium.webdriver.Chrome):
""" """
_instances = set() _instances = set()
session_id = None
def __init__( def __init__(
self, self,
@ -129,9 +137,6 @@ class Chrome(selenium.webdriver.Chrome):
Specify whether you want to use the browser in headless mode. Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported. warning: this lowers undetectability and not fully supported.
emulate_touch: bool, optional, default: False
if set to True, patches window.maxTouchPoints to always return non-zero
delay: int, optional, default: 5 delay: int, optional, default: 5
delay in seconds to wait before giving back control. delay in seconds to wait before giving back control.
this is used only when using the context manager this is used only when using the context manager
@ -139,7 +144,7 @@ class Chrome(selenium.webdriver.Chrome):
5 seconds is a foolproof value. 5 seconds is a foolproof value.
version_main: int, optional, default: None (=auto) version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87 here. Example: 87 for all versions of 87
@ -149,14 +154,20 @@ class Chrome(selenium.webdriver.Chrome):
setting it is not recommended, unless you know the implications and think setting it is not recommended, unless you know the implications and think
you might need it. you might need it.
""" """
patcher = Patcher(executable_path=executable_path, force=patcher_force_close, version_main=version_main)
patcher = Patcher(
executable_path=executable_path,
force=patcher_force_close,
version_main=version_main,
)
patcher.auto() patcher.auto()
if not options: if not options:
options = ChromeOptions() options = ChromeOptions()
try: try:
if options.session and options.session is not None: if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options, # prevent reuse of options,
# as it just appends arguments, not replace them # as it just appends arguments, not replace them
# you'll get conflicts starting chrome # you'll get conflicts starting chrome
@ -164,7 +175,7 @@ class Chrome(selenium.webdriver.Chrome):
except AttributeError: except AttributeError:
pass pass
options.session = self options._session = self
debug_port = selenium.webdriver.common.service.utils.free_port() debug_port = selenium.webdriver.common.service.utils.free_port()
debug_host = "127.0.0.1" debug_host = "127.0.0.1"
@ -250,9 +261,9 @@ class Chrome(selenium.webdriver.Chrome):
options.add_argument("--window-size=1920,1080") options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized") options.add_argument("--start-maximized")
options.add_argument("--no-sandbox") options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running # fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend) # on linux using privileged user like root (which i don't recommend)
options.add_argument( options.add_argument(
"--log-level=%d" % log_level "--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
@ -280,15 +291,16 @@ class Chrome(selenium.webdriver.Chrome):
if not desired_capabilities: if not desired_capabilities:
desired_capabilities = options.to_capabilities() desired_capabilities = options.to_capabilities()
self.browser = subprocess.Popen( self.browser = subprocess.Popen(
[options.binary_location, *options.arguments], [options.binary_location, *options.arguments],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True, close_fds=IS_POSIX,
) )
super().__init__( super(Chrome, self).__init__(
executable_path=patcher.executable_path, executable_path=patcher.executable_path,
port=port, port=port,
options=options, options=options,
@ -320,10 +332,22 @@ class Chrome(selenium.webdriver.Chrome):
reactor.start() reactor.start()
self.reactor = reactor self.reactor = reactor
if options.headless: if options.headless:
self._configure_headless() self._configure_headless()
orig_get = self.get
# def get_wrapped(*args, **kwargs):
# self.execute_cdp_cmd(
# "Network.setExtraHTTPHeaders",
# {"headers": {"dnt": "1", "cache-control": "no-cache"}},
# )
#
# return orig_get(*args, **kwargs)
#
# self.get = get_wrapped
def _configure_headless(self): def _configure_headless(self):
orig_get = self.get orig_get = self.get
@ -508,12 +532,12 @@ class Chrome(selenium.webdriver.Chrome):
self.reactor.add_event_handler(event_name, callback) self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers return self.reactor.handlers
return False return False
def clear_cdp_listeners(self): def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor): if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear() self.reactor.handlers.clear()
def tab_new(self, url:str): def tab_new(self, url: str):
""" """
this opens a url in a new tab. this opens a url in a new tab.
apparently, that passes all tests directly! apparently, that passes all tests directly!
@ -526,17 +550,18 @@ class Chrome(selenium.webdriver.Chrome):
------- -------
""" """
if not hasattr(self, 'cdp'): if not hasattr(self, "cdp"):
from .cdp import CDP from .cdp import CDP
self.cdp = CDP(self.options) self.cdp = CDP(self.options)
self.cdp.tab_new(url) self.cdp.tab_new(url)
def reconnect(self): def reconnect(self, timeout=0.1):
try: try:
self.service.stop() self.service.stop()
except Exception as e: except Exception as e:
logger.debug(e) logger.debug(e)
time.sleep(timeout)
try: try:
self.service.start() self.service.start()
except Exception as e: except Exception as e:
@ -550,20 +575,20 @@ class Chrome(selenium.webdriver.Chrome):
def start_session(self, capabilities=None, browser_profile=None): def start_session(self, capabilities=None, browser_profile=None):
if not capabilities: if not capabilities:
capabilities = self.options.to_capabilities() capabilities = self.options.to_capabilities()
super().start_session(capabilities, browser_profile) super(Chrome, self).start_session(capabilities, browser_profile)
def quit(self): def quit(self):
logger.debug("closing webdriver") logger.debug("closing webdriver")
self.service.process.kill()
try: try:
if self.reactor and isinstance(self.reactor, Reactor): if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.event.set() self.reactor.event.set()
super().quit()
except Exception: # noqa except Exception: # noqa
pass pass
try: try:
logger.debug("killing browser") logger.debug("killing browser")
self.browser.kill() self.browser.terminate()
self.browser.wait(1) self.browser.wait(1)
except TimeoutError as e: except TimeoutError as e:
@ -571,10 +596,12 @@ class Chrome(selenium.webdriver.Chrome):
except Exception: # noqa except Exception: # noqa
pass pass
if hasattr(self, 'keep_user_data_dir') \ if (
and not self.keep_user_data_dir \ hasattr(self, "keep_user_data_dir")
or self.keep_user_data_dir is False: and hasattr(self, "user_data_dir")
for _ in range(3): and not self.keep_user_data_dir
):
for _ in range(5):
try: try:
logger.debug("removing profile : %s" % self.user_data_dir) logger.debug("removing profile : %s" % self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=False) shutil.rmtree(self.user_data_dir, ignore_errors=False)
@ -585,15 +612,16 @@ class Chrome(selenium.webdriver.Chrome):
"permission error. files are still in use/locked. retying..." "permission error. files are still in use/locked. retying..."
) )
except (RuntimeError, OSError) as e: except (RuntimeError, OSError) as e:
logger.debug( logger.debug("%s retying..." % e)
"%s retying..." % e
)
else: else:
break break
time.sleep(.25) time.sleep(0.1)
def __del__(self): def __del__(self):
logger.debug("Chrome.__del__") try:
self.service.process.kill()
except:
pass
self.quit() self.quit()
def __enter__(self): def __enter__(self):
@ -608,23 +636,6 @@ class Chrome(selenium.webdriver.Chrome):
def __hash__(self): def __hash__(self):
return hash(self.options.debugger_address) return hash(self.options.debugger_address)
def find_elements_by_text(self, text: str):
for elem in self.find_elements_by_css_selector("*"):
try:
if text.lower() in elem.text.lower():
yield elem
except Exception as e:
logger.debug("find_elements_by_text: %s" % e)
def find_element_by_text(self, text: str, selector=None):
if not selector:
selector = "*"
for elem in self.find_elements_by_css_selector(selector):
try:
if text.lower() in elem.text.lower():
return elem
except Exception as e:
logger.debug("find_elements_by_text: {}".format(e))
def find_chrome_executable(): def find_chrome_executable():