3.4.4 - Fixed

This commit is contained in:
UltrafunkAmsterdam 2023-02-08 01:27:50 +01:00
parent 305803ca95
commit d3fe33fceb
2 changed files with 303 additions and 340 deletions

View File

@ -17,7 +17,7 @@ by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
from __future__ import annotations from __future__ import annotations
__version__ = "3.4.2" __version__ = "3.4.4"
import json import json
import logging import logging
@ -48,19 +48,19 @@ from .webelement import WebElement
__all__ = ( __all__ = (
"Chrome" , "Chrome",
"ChromeOptions" , "ChromeOptions",
"Patcher" , "Patcher",
"Reactor" , "Reactor",
"CDP" , "CDP",
"find_chrome_executable" , "find_chrome_executable",
) )
logger = logging.getLogger( "uc" ) logger = logging.getLogger("uc")
logger.setLevel( logging.getLogger().getEffectiveLevel() ) logger.setLevel(logging.getLogger().getEffectiveLevel())
class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
""" """
Controls the ChromeDriver and allows you to drive the browser. Controls the ChromeDriver and allows you to drive the browser.
@ -96,36 +96,35 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
-------------------------------------------------------------------------- --------------------------------------------------------------------------
""" """
_instances = set() _instances = set()
session_id = None session_id = None
debug = False debug = False
def __init__( def __init__(
self , self,
options = None , options=None,
user_data_dir = None , user_data_dir=None,
driver_executable_path = None , driver_executable_path=None,
browser_executable_path = None , browser_executable_path=None,
port = 0 , port=0,
enable_cdp_events = False , enable_cdp_events=False,
service_args = None , service_args=None,
service_creationflags = None , service_creationflags=None,
desired_capabilities = None , desired_capabilities=None,
advanced_elements = False , advanced_elements=False,
service_log_path = None , service_log_path=None,
keep_alive = True , keep_alive=True,
log_level = 0 , log_level=0,
headless = False , headless=False,
version_main = None , version_main=None,
patcher_force_close = False , patcher_force_close=False,
suppress_welcome = True , suppress_welcome=True,
use_subprocess = True , use_subprocess=True,
debug = False , debug=False,
no_sandbox = True , no_sandbox=True,
**kw , **kw,
): ):
""" """
Creates a new instance of the chrome driver. Creates a new instance of the chrome driver.
@ -236,30 +235,30 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
this option has a default of True since many people seem to run this as root (....) , and chrome does not start this option has a default of True since many people seem to run this as root (....) , and chrome does not start
when running as root without using --no-sandbox flag. when running as root without using --no-sandbox flag.
""" """
finalize( self , self._ensure_close , self ) finalize(self, self._ensure_close, self)
self.debug = debug self.debug = debug
self.patcher = Patcher( self.patcher = Patcher(
executable_path = driver_executable_path , executable_path=driver_executable_path,
force = patcher_force_close , force=patcher_force_close,
version_main = version_main , version_main=version_main,
) )
self.patcher.auto() self.patcher.auto()
# self.patcher = patcher # self.patcher = patcher
if not options: if not options:
options = ChromeOptions() options = ChromeOptions()
try: try:
if hasattr( options , "_session" ) and options._session is not None: if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options, # prevent reuse of options,
# as it just appends arguments, not replace them # as it just appends arguments, not replace them
# you'll get conflicts starting chrome # you'll get conflicts starting chrome
raise RuntimeError( "you cannot reuse the ChromeOptions object" ) raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError: except AttributeError:
pass pass
options._session = self options._session = self
if not options.debugger_address: if not options.debugger_address:
debug_port = ( debug_port = (
port port
@ -267,209 +266,207 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
else selenium.webdriver.common.service.utils.free_port() else selenium.webdriver.common.service.utils.free_port()
) )
debug_host = "127.0.0.1" debug_host = "127.0.0.1"
options.debugger_address = "%s:%d" % (debug_host , debug_port) options.debugger_address = "%s:%d" % (debug_host, debug_port)
else: else:
debug_host , debug_port = options.debugger_address.split( ":" ) debug_host, debug_port = options.debugger_address.split(":")
debug_port = int( debug_port ) debug_port = int(debug_port)
if enable_cdp_events: if enable_cdp_events:
options.set_capability( options.set_capability(
"goog:loggingPrefs" , { "performance": "ALL" , "browser": "ALL" } "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
) )
options.add_argument( "--remote-debugging-host=%s" % debug_host ) options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument( "--remote-debugging-port=%s" % debug_port ) options.add_argument("--remote-debugging-port=%s" % debug_port)
if user_data_dir: if user_data_dir:
options.add_argument( "--user-data-dir=%s" % user_data_dir ) options.add_argument("--user-data-dir=%s" % user_data_dir)
language , keep_user_data_dir = None , bool( user_data_dir ) language, keep_user_data_dir = None, bool(user_data_dir)
# see if a custom user profile is specified in options # see if a custom user profile is specified in options
for arg in options.arguments: for arg in options.arguments:
if "lang" in arg: if "lang" in arg:
m = re.search( "(?:--)?lang(?:[ =])?(.*)" , arg ) m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try: try:
language = m[ 1 ] language = m[1]
except IndexError: except IndexError:
logger.debug( "will set the language to en-US,en;q=0.9" ) logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9" language = "en-US,en;q=0.9"
if "user-data-dir" in arg: if "user-data-dir" in arg:
m = re.search( "(?:--)?user-data-dir(?:[ =])?(.*)" , arg ) m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try: try:
user_data_dir = m[ 1 ] user_data_dir = m[1]
logger.debug( logger.debug(
"user-data-dir found in user argument %s => %s" % (arg , m[ 1 ]) "user-data-dir found in user argument %s => %s" % (arg, m[1])
) )
keep_user_data_dir = True keep_user_data_dir = True
except IndexError: except IndexError:
logger.debug( logger.debug(
"no user data dir could be extracted from supplied argument %s " "no user data dir could be extracted from supplied argument %s "
% arg % arg
) )
if not user_data_dir: if not user_data_dir:
# backward compatiblity # backward compatiblity
# check if an old uc.ChromeOptions is used, and extract the user data dir # check if an old uc.ChromeOptions is used, and extract the user data dir
if hasattr( options , "user_data_dir" ) and getattr( if hasattr(options, "user_data_dir") and getattr(
options , "user_data_dir" , None options, "user_data_dir", None
): ):
import warnings import warnings
warnings.warn( warnings.warn(
"using ChromeOptions.user_data_dir might stop working in future versions." "using ChromeOptions.user_data_dir might stop working in future versions."
"use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder"
) )
options.add_argument( "--user-data-dir=%s" % options.user_data_dir ) options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True keep_user_data_dir = True
logger.debug( logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir "user_data_dir property found in options object: %s" % user_data_dir
) )
else: else:
user_data_dir = os.path.normpath( tempfile.mkdtemp() ) user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir arg = "--user-data-dir=%s" % user_data_dir
options.add_argument( arg ) options.add_argument(arg)
logger.debug( logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n" "created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg "session, and added it to chrome startup arguments: %s" % arg
) )
if not language: if not language:
try: try:
import locale import locale
language = locale.getdefaultlocale()[ 0 ].replace( "_" , "-" ) language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception: except Exception:
pass pass
if not language: if not language:
language = "en-US" language = "en-US"
options.add_argument( "--lang=%s" % language ) options.add_argument("--lang=%s" % language)
if not options.binary_location: if not options.binary_location:
options.binary_location = ( options.binary_location = (
browser_executable_path or find_chrome_executable() browser_executable_path or find_chrome_executable()
) )
self._delay = 3 self._delay = 3
self.user_data_dir = user_data_dir self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir self.keep_user_data_dir = keep_user_data_dir
if suppress_welcome: if suppress_welcome:
options.arguments.extend( [ "--no-default-browser-check" , "--no-first-run" ] ) options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
if no_sandbox: if no_sandbox:
options.arguments.extend( [ "--no-sandbox" , "--test-type" ] ) options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or options.headless: if headless or options.headless:
options.headless = True options.headless = True
options.add_argument( "--window-size=1920,1080" ) options.add_argument("--window-size=1920,1080")
options.add_argument( "--start-maximized" ) options.add_argument("--start-maximized")
options.add_argument( "--no-sandbox" ) options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running # fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend) # on linux using privileged user like root (which i don't recommend)
options.add_argument( options.add_argument(
"--log-level=%d" % log_level "--log-level=%d" % log_level
or divmod( logging.getLogger().getEffectiveLevel() , 10 )[ 0 ] or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
) )
if hasattr( options , "handle_prefs" ): if hasattr(options, "handle_prefs"):
options.handle_prefs( user_data_dir ) options.handle_prefs(user_data_dir)
# fix exit_type flag to prevent tab-restore nag # fix exit_type flag to prevent tab-restore nag
try: try:
with open( with open(
os.path.join( user_data_dir , "Default/Preferences" ) , os.path.join(user_data_dir, "Default/Preferences"),
encoding = "latin1" , encoding="latin1",
mode = "r+" , mode="r+",
) as fs: ) as fs:
config = json.load( fs ) config = json.load(fs)
if config[ "profile" ][ "exit_type" ] is not None: if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag # fixing the restore-tabs-nag
config[ "profile" ][ "exit_type" ] = None config["profile"]["exit_type"] = None
fs.seek( 0 , 0 ) fs.seek(0, 0)
json.dump( config , fs ) json.dump(config, fs)
fs.truncate() # the file might be shorter fs.truncate() # the file might be shorter
logger.debug( "fixed exit_type flag" ) logger.debug("fixed exit_type flag")
except Exception as e: except Exception as e:
logger.debug( "did not find a bad exit_type flag " ) logger.debug("did not find a bad exit_type flag ")
self.options = options self.options = options
if not desired_capabilities: if not desired_capabilities:
desired_capabilities = options.to_capabilities() desired_capabilities = options.to_capabilities()
if not use_subprocess: if not use_subprocess:
self.browser_pid = start_detached( self.browser_pid = start_detached(
options.binary_location , *options.arguments options.binary_location, *options.arguments
) )
else: else:
browser = subprocess.Popen( browser = subprocess.Popen(
[ options.binary_location , *options.arguments ] , [options.binary_location, *options.arguments],
stdin = subprocess.PIPE , stdin=subprocess.PIPE,
stdout = subprocess.PIPE , stdout=subprocess.PIPE,
stderr = subprocess.PIPE , stderr=subprocess.PIPE,
close_fds = IS_POSIX , close_fds=IS_POSIX,
) )
self.browser_pid = browser.pid self.browser_pid = browser.pid
if service_creationflags: if service_creationflags:
service = selenium.webdriver.common.service.Service( service = selenium.webdriver.common.service.Service(
self.patcher.executable_path , port , service_args , service_log_path self.patcher.executable_path, port, service_args, service_log_path
) )
for attr_name in ("creationflags" , "creation_flags"): for attr_name in ("creationflags", "creation_flags"):
if hasattr( service , attr_name ): if hasattr(service, attr_name):
setattr( service , attr_name , service_creationflags ) setattr(service, attr_name, service_creationflags)
break break
else: else:
service = None service = None
super( Chrome , self ).__init__( super(Chrome, self).__init__(
executable_path = self.patcher.executable_path , executable_path=self.patcher.executable_path,
port = port , port=port,
options = options , options=options,
service_args = service_args , service_args=service_args,
desired_capabilities = desired_capabilities , desired_capabilities=desired_capabilities,
service_log_path = service_log_path , service_log_path=service_log_path,
keep_alive = keep_alive , keep_alive=keep_alive,
service = service , # needed or the service will be re-created service=service, # needed or the service will be re-created
) )
self.reactor = None self.reactor = None
if enable_cdp_events: if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG: if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger( logging.getLogger(
"selenium.webdriver.remote.remote_connection" "selenium.webdriver.remote.remote_connection"
).setLevel( 20 ) ).setLevel(20)
reactor = Reactor( self ) reactor = Reactor(self)
reactor.start() reactor.start()
self.reactor = reactor self.reactor = reactor
if advanced_elements: if advanced_elements:
self._web_element_cls = UCWebElement self._web_element_cls = UCWebElement
else: else:
self._web_element_cls = WebElement self._web_element_cls = WebElement
if options.headless: if options.headless:
self._configure_headless() self._configure_headless()
def _configure_headless(self):
def _configure_headless( self ):
orig_get = self.get orig_get = self.get
logger.info( "setting properties for headless" ) logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
def get_wrapped( *args , **kwargs ): if self.execute_script("return navigator.webdriver"):
if self.execute_script( "return navigator.webdriver" ): logger.info("patch navigator.webdriver")
logger.info( "patch navigator.webdriver" )
self.execute_cdp_cmd( self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument" , "Page.addScriptToEvaluateOnNewDocument",
{ {
"source": """ "source": """
@ -486,20 +483,20 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
}), }),
}); });
""" """
} , },
) )
logger.info( "patch user-agent string" ) logger.info("patch user-agent string")
self.execute_cdp_cmd( self.execute_cdp_cmd(
"Network.setUserAgentOverride" , "Network.setUserAgentOverride",
{ {
"userAgent": self.execute_script( "userAgent": self.execute_script(
"return navigator.userAgent" "return navigator.userAgent"
).replace( "Headless" , "" ) ).replace("Headless", "")
} , },
) )
self.execute_cdp_cmd( self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument" , "Page.addScriptToEvaluateOnNewDocument",
{ {
"source": """ "source": """
Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1}); Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1});
@ -597,14 +594,12 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
// eslint-disable-next-line // eslint-disable-next-line
Function.prototype.toString = functionToString Function.prototype.toString = functionToString
""" """
} , },
) )
return orig_get( *args , **kwargs ) return orig_get(*args, **kwargs)
self.get = get_wrapped self.get = get_wrapped
# def _get_cdc_props(self): # def _get_cdc_props(self):
# return self.execute_script( # return self.execute_script(
# """ # """
@ -633,36 +628,32 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
# """ # """
# }, # },
# ) # )
def get( self , url ): def get(self, url):
# if self._get_cdc_props(): # if self._get_cdc_props():
# self._hook_remove_cdc_props() # self._hook_remove_cdc_props()
return super().get( url ) return super().get(url)
def add_cdp_listener(self, event_name, callback):
def add_cdp_listener( self , event_name , callback ):
if ( if (
self.reactor self.reactor
and self.reactor is not None and self.reactor is not None
and isinstance( self.reactor , Reactor ) and isinstance(self.reactor, Reactor)
): ):
self.reactor.add_event_handler( event_name , callback ) self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers return self.reactor.handlers
return False return False
def clear_cdp_listeners(self):
def clear_cdp_listeners( self ): if self.reactor and isinstance(self.reactor, Reactor):
if self.reactor and isinstance( self.reactor , Reactor ):
self.reactor.handlers.clear() self.reactor.handlers.clear()
def window_new(self):
def window_new( self ):
self.execute( self.execute(
selenium.webdriver.remote.command.Command.NEW_WINDOW , { "type": "window" } selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"}
) )
def tab_new(self, url: str):
def tab_new( self , url: str ):
""" """
this opens a url in a new tab. this opens a url in a new tab.
apparently, that passes all tests directly! apparently, that passes all tests directly!
@ -675,135 +666,125 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ):
------- -------
""" """
if not hasattr( self , "cdp" ): if not hasattr(self, "cdp"):
from .cdp import CDP from .cdp import CDP
cdp = CDP( self.options ) cdp = CDP(self.options)
cdp.tab_new( url ) cdp.tab_new(url)
def reconnect(self, timeout=0.1):
def reconnect( self , timeout = 0.1 ):
try: try:
self.service.stop() self.service.stop()
except Exception as e: except Exception as e:
logger.debug( e ) logger.debug(e)
time.sleep( timeout ) time.sleep(timeout)
try: try:
self.service.start() self.service.start()
except Exception as e: except Exception as e:
logger.debug( e ) logger.debug(e)
try: try:
self.start_session() self.start_session()
except Exception as e: except Exception as e:
logger.debug( e ) logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
def start_session( self , capabilities = None , browser_profile = None ):
if not capabilities: if not capabilities:
capabilities = self.options.to_capabilities() capabilities = self.options.to_capabilities()
super( selenium.webdriver.chrome.webdriver.WebDriver , self ).start_session( super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session(
capabilities , browser_profile capabilities, browser_profile
) )
# super(Chrome, self).start_session(capabilities, browser_profile) # super(Chrome, self).start_session(capabilities, browser_profile)
def quit(self):
def quit( self ):
try: try:
self.service.process.kill() self.service.process.kill()
logger.debug( "webdriver process ended" ) logger.debug("webdriver process ended")
except (AttributeError , RuntimeError , OSError): except (AttributeError, RuntimeError, OSError):
pass pass
try: try:
self.reactor.event.set() self.reactor.event.set()
logger.debug( "shutting down reactor" ) logger.debug("shutting down reactor")
except AttributeError: except AttributeError:
pass pass
try: try:
os.kill( self.browser_pid , 15 ) os.kill(self.browser_pid, 15)
logger.debug( "gracefully closed browser" ) logger.debug("gracefully closed browser")
except Exception as e: # noqa except Exception as e: # noqa
logger.debug( e , exc_info = True ) logger.debug(e, exc_info=True)
if ( if (
hasattr( self , "keep_user_data_dir" ) hasattr(self, "keep_user_data_dir")
and hasattr( self , "user_data_dir" ) and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir and not self.keep_user_data_dir
): ):
for _ in range( 5 ): for _ in range(5):
try: try:
shutil.rmtree( self.user_data_dir , ignore_errors = False ) shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError: except FileNotFoundError:
pass pass
except (RuntimeError , OSError , PermissionError) as e: except (RuntimeError, OSError, PermissionError) as e:
logger.debug( logger.debug(
"When removing the temp profile, a %s occured: %s\nretrying..." "When removing the temp profile, a %s occured: %s\nretrying..."
% (e.__class__.__name__ , e) % (e.__class__.__name__, e)
) )
else: else:
logger.debug( "successfully removed %s" % self.user_data_dir ) logger.debug("successfully removed %s" % self.user_data_dir)
break break
time.sleep( 0.1 ) time.sleep(0.1)
# dereference patcher, so patcher can start cleaning up as well. # dereference patcher, so patcher can start cleaning up as well.
# this must come last, otherwise it will throw 'in use' errors # this must come last, otherwise it will throw 'in use' errors
self.patcher = None self.patcher = None
def __getattribute__(self, item):
def __getattribute__( self , item ): if not super().__getattribute__("debug"):
if not super().__getattribute__( "debug" ): return super().__getattribute__(item)
return super().__getattribute__( item )
else: else:
import inspect import inspect
original = super().__getattribute__( item ) original = super().__getattribute__(item)
if inspect.ismethod( original ) and not inspect.isclass( original ): if inspect.ismethod(original) and not inspect.isclass(original):
def newfunc( *args , **kwargs ):
def newfunc(*args, **kwargs):
logger.debug( logger.debug(
"calling %s with args %s and kwargs %s\n" "calling %s with args %s and kwargs %s\n"
% (original.__qualname__ , args , kwargs) % (original.__qualname__, args, kwargs)
) )
return original( *args , **kwargs ) return original(*args, **kwargs)
return newfunc return newfunc
return original return original
def __enter__(self):
def __enter__( self ):
return self return self
def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__( self , exc_type , exc_val , exc_tb ):
self.service.stop() self.service.stop()
time.sleep( self._delay ) time.sleep(self._delay)
self.service.start() self.service.start()
self.start_session() self.start_session()
def __hash__(self):
def __hash__( self ): return hash(self.options.debugger_address)
return hash( self.options.debugger_address )
def __dir__(self):
return object.__dir__(self)
def __dir__( self ):
return object.__dir__( self ) def __del__(self):
def __del__( self ):
try: try:
self.service.process.kill() self.service.process.kill()
except: # noqa except: # noqa
pass pass
self.quit() self.quit()
@classmethod @classmethod
def _ensure_close( cls , self ): def _ensure_close(cls, self):
# needs to be a classmethod so finalize can find the reference # needs to be a classmethod so finalize can find the reference
logger.info( "ensuring close" ) logger.info("ensuring close")
if ( if (
hasattr( self , "service" ) hasattr(self, "service")
and hasattr( self.service , "process" ) and hasattr(self.service, "process")
and hasattr( self.service.process , "kill" ) and hasattr(self.service.process, "kill")
): ):
self.service.process.kill() self.service.process.kill()
@ -820,34 +801,34 @@ def find_chrome_executable():
""" """
candidates = set() candidates = set()
if IS_POSIX: if IS_POSIX:
for item in os.environ.get( "PATH" ).split( os.pathsep ): for item in os.environ.get("PATH").split(os.pathsep):
for subitem in ( for subitem in (
"google-chrome" , "google-chrome",
"chromium" , "chromium",
"chromium-browser" , "chromium-browser",
"chrome" , "chrome",
"google-chrome-stable" , "google-chrome-stable",
): ):
candidates.add( os.sep.join( (item , subitem) ) ) candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform: if "darwin" in sys.platform:
candidates.update( candidates.update(
[ [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" , "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium" , "/Applications/Chromium.app/Contents/MacOS/Chromium",
] ]
) )
else: else:
for item in map( for item in map(
os.environ.get , os.environ.get,
("PROGRAMFILES" , "PROGRAMFILES(X86)" , "LOCALAPPDATA" , "PROGRAMW6432") , ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
): ):
if item is not None: if item is not None:
for subitem in ( for subitem in (
"Google/Chrome/Application" , "Google/Chrome/Application",
"Google/Chrome Beta/Application" , "Google/Chrome Beta/Application",
"Google/Chrome Canary/Application" , "Google/Chrome Canary/Application",
): ):
candidates.add( os.sep.join( (item , subitem , "chrome.exe") ) ) candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates: for candidate in candidates:
if os.path.exists( candidate ) and os.access( candidate , os.X_OK ): if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath( candidate ) return os.path.normpath(candidate)

View File

@ -94,9 +94,7 @@ class Patcher(object):
self.version_full = None self.version_full = None
def auto(self, executable_path=None, force=False, version_main=None): def auto(self, executable_path=None, force=False, version_main=None):
if executable_path: if executable_path:
self.executable_path = executable_path self.executable_path = executable_path
self._custom_exe_path = True self._custom_exe_path = True
@ -207,12 +205,7 @@ class Patcher(object):
@staticmethod @staticmethod
def gen_random_cdc(): def gen_random_cdc():
# make cdc_variables without underscores
cdc = random.choices(string.ascii_letters, k=27) cdc = random.choices(string.ascii_letters, k=27)
# cdc[-6:-4] = map(str.upper, cdc[-6:-4])
# cdc[2] = cdc[0]
# cdc[3] = "_"
return "".join(cdc).encode() return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None): def is_binary_patched(self, executable_path=None):
@ -222,44 +215,33 @@ class Patcher(object):
return fh.read().find(b"undetected chromedriver") != -1 return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError: except FileNotFoundError:
return False return False
def patch_exe(self): def patch_exe(self):
start = time.perf_counter() start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path) logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh: with io.open(self.executable_path, "r+b") as fh:
#content = fh.read() content = fh.read()
#match_injected_codeblock = re.search(rb"{window.*;}", content) # match_injected_codeblock = re.search(rb"{window.*;}", content)
#if match_injected_codeblock: match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
# target_bytes = match_injected_codeblock[0] if match_injected_codeblock:
# new_target_bytes = ( target_bytes = match_injected_codeblock[0]
# b'{console.log("undetected chromedriver 1337!")}'.ljust( new_target_bytes = (
# len(target_bytes), b" " b'{console.log("undetected chromedriver 1337!")}'.ljust(
# ) len(target_bytes), b" "
# ) )
# new_content = content.replace(target_bytes, new_target_bytes) )
# if new_content == content: new_content = content.replace(target_bytes, new_target_bytes)
# logger.warning( if new_content == content:
# "something went wrong patching the driver binary. could not find injection code block" logger.warning(
# ) "something went wrong patching the driver binary. could not find injection code block"
# else: )
# logger.debug( else:
# "found block:\n%s\nreplacing with:\n%s" logger.debug(
# % (target_bytes, new_target_bytes) "found block:\n%s\nreplacing with:\n%s"
# ) % (target_bytes, new_target_bytes)
# fh.seek(0) )
# fh.write(new_content) fh.seek(0)
fh.write(new_content)
# we just keep the cdc variables as they can't be injected anyways so no harm
# keeping for reference
# fh.seek(0)
for line in iter( lambda: fh.readline() , b"" ):
if b'cdc_' in line:
fh.seek( -len( line ) , 1 )
new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line )
logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) )
fh.write( new_line )
# else:
# logger.info("%s seems already patched ?!?!" % self.executable_path)
logger.debug( logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start) "patching took us {:.2f} seconds".format(time.perf_counter() - start)
) )