From 305803ca954ea94a0778daebd1d98bb342da514c Mon Sep 17 00:00:00 2001 From: UltrafunkAmsterdam Date: Sun, 5 Feb 2023 18:37:28 +0100 Subject: [PATCH 1/2] fix for linux find_elements: SyntaxError: missing ) after argument list --- undetected_chromedriver/__init__.py | 601 ++++++++++++++-------------- undetected_chromedriver/patcher.py | 73 ++-- 2 files changed, 348 insertions(+), 326 deletions(-) diff --git a/undetected_chromedriver/__init__.py b/undetected_chromedriver/__init__.py index ff4d0c6..792a051 100644 --- a/undetected_chromedriver/__init__.py +++ b/undetected_chromedriver/__init__.py @@ -48,19 +48,19 @@ from .webelement import WebElement __all__ = ( - "Chrome", - "ChromeOptions", - "Patcher", - "Reactor", - "CDP", - "find_chrome_executable", -) + "Chrome" , + "ChromeOptions" , + "Patcher" , + "Reactor" , + "CDP" , + "find_chrome_executable" , + ) -logger = logging.getLogger("uc") -logger.setLevel(logging.getLogger().getEffectiveLevel()) +logger = logging.getLogger( "uc" ) +logger.setLevel( logging.getLogger().getEffectiveLevel() ) -class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): +class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): """ Controls the ChromeDriver and allows you to drive the browser. @@ -96,35 +96,36 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): -------------------------------------------------------------------------- """ - + _instances = set() session_id = None debug = False - + + def __init__( - self, - options=None, - user_data_dir=None, - driver_executable_path=None, - browser_executable_path=None, - port=0, - enable_cdp_events=False, - service_args=None, - service_creationflags=None, - desired_capabilities=None, - advanced_elements=False, - service_log_path=None, - keep_alive=True, - log_level=0, - headless=False, - version_main=None, - patcher_force_close=False, - suppress_welcome=True, - use_subprocess=True, - debug=False, - no_sandbox=True, - **kw, - ): + self , + options = None , + user_data_dir = None , + driver_executable_path = None , + browser_executable_path = None , + port = 0 , + enable_cdp_events = False , + service_args = None , + service_creationflags = None , + desired_capabilities = None , + advanced_elements = False , + service_log_path = None , + keep_alive = True , + log_level = 0 , + headless = False , + version_main = None , + patcher_force_close = False , + suppress_welcome = True , + use_subprocess = True , + debug = False , + no_sandbox = True , + **kw , + ): """ Creates a new instance of the chrome driver. @@ -235,30 +236,30 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): this option has a default of True since many people seem to run this as root (....) , and chrome does not start when running as root without using --no-sandbox flag. """ - - finalize(self, self._ensure_close, self) + + finalize( self , self._ensure_close , self ) self.debug = debug self.patcher = Patcher( - executable_path=driver_executable_path, - force=patcher_force_close, - version_main=version_main, - ) + executable_path = driver_executable_path , + force = patcher_force_close , + version_main = version_main , + ) self.patcher.auto() # self.patcher = patcher if not options: options = ChromeOptions() - + try: - if hasattr(options, "_session") and options._session is not None: + if hasattr( options , "_session" ) and options._session is not None: # prevent reuse of options, # as it just appends arguments, not replace them # you'll get conflicts starting chrome - raise RuntimeError("you cannot reuse the ChromeOptions object") + raise RuntimeError( "you cannot reuse the ChromeOptions object" ) except AttributeError: pass - + options._session = self - + if not options.debugger_address: debug_port = ( port @@ -266,237 +267,239 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): else selenium.webdriver.common.service.utils.free_port() ) debug_host = "127.0.0.1" - options.debugger_address = "%s:%d" % (debug_host, debug_port) + options.debugger_address = "%s:%d" % (debug_host , debug_port) else: - debug_host, debug_port = options.debugger_address.split(":") - debug_port = int(debug_port) - + debug_host , debug_port = options.debugger_address.split( ":" ) + debug_port = int( debug_port ) + if enable_cdp_events: options.set_capability( - "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"} - ) - - options.add_argument("--remote-debugging-host=%s" % debug_host) - options.add_argument("--remote-debugging-port=%s" % debug_port) - + "goog:loggingPrefs" , { "performance": "ALL" , "browser": "ALL" } + ) + + options.add_argument( "--remote-debugging-host=%s" % debug_host ) + options.add_argument( "--remote-debugging-port=%s" % debug_port ) + if user_data_dir: - options.add_argument("--user-data-dir=%s" % user_data_dir) - - language, keep_user_data_dir = None, bool(user_data_dir) - + options.add_argument( "--user-data-dir=%s" % user_data_dir ) + + language , keep_user_data_dir = None , bool( user_data_dir ) + # see if a custom user profile is specified in options for arg in options.arguments: if "lang" in arg: - m = re.search("(?:--)?lang(?:[ =])?(.*)", arg) + m = re.search( "(?:--)?lang(?:[ =])?(.*)" , arg ) try: - language = m[1] + language = m[ 1 ] except IndexError: - logger.debug("will set the language to en-US,en;q=0.9") + logger.debug( "will set the language to en-US,en;q=0.9" ) language = "en-US,en;q=0.9" - + if "user-data-dir" in arg: - m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) + m = re.search( "(?:--)?user-data-dir(?:[ =])?(.*)" , arg ) try: - user_data_dir = m[1] + user_data_dir = m[ 1 ] logger.debug( - "user-data-dir found in user argument %s => %s" % (arg, m[1]) - ) + "user-data-dir found in user argument %s => %s" % (arg , m[ 1 ]) + ) keep_user_data_dir = True - + except IndexError: logger.debug( "no user data dir could be extracted from supplied argument %s " % arg - ) - + ) + if not user_data_dir: # backward compatiblity # check if an old uc.ChromeOptions is used, and extract the user data dir - - if hasattr(options, "user_data_dir") and getattr( - options, "user_data_dir", None - ): + + if hasattr( options , "user_data_dir" ) and getattr( + options , "user_data_dir" , None + ): import warnings - + warnings.warn( "using ChromeOptions.user_data_dir might stop working in future versions." "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" - ) - options.add_argument("--user-data-dir=%s" % options.user_data_dir) + ) + options.add_argument( "--user-data-dir=%s" % options.user_data_dir ) keep_user_data_dir = True logger.debug( "user_data_dir property found in options object: %s" % user_data_dir - ) - + ) + else: - user_data_dir = os.path.normpath(tempfile.mkdtemp()) + user_data_dir = os.path.normpath( tempfile.mkdtemp() ) keep_user_data_dir = False arg = "--user-data-dir=%s" % user_data_dir - options.add_argument(arg) + options.add_argument( arg ) logger.debug( "created a temporary folder in which the user-data (profile) will be stored during this\n" "session, and added it to chrome startup arguments: %s" % arg - ) - + ) + if not language: try: import locale - - language = locale.getdefaultlocale()[0].replace("_", "-") + + language = locale.getdefaultlocale()[ 0 ].replace( "_" , "-" ) except Exception: pass if not language: language = "en-US" - - options.add_argument("--lang=%s" % language) - + + options.add_argument( "--lang=%s" % language ) + if not options.binary_location: options.binary_location = ( - browser_executable_path or find_chrome_executable() + browser_executable_path or find_chrome_executable() ) - + self._delay = 3 - + self.user_data_dir = user_data_dir self.keep_user_data_dir = keep_user_data_dir - + if suppress_welcome: - options.arguments.extend(["--no-default-browser-check", "--no-first-run"]) + options.arguments.extend( [ "--no-default-browser-check" , "--no-first-run" ] ) if no_sandbox: - options.arguments.extend(["--no-sandbox", "--test-type"]) + options.arguments.extend( [ "--no-sandbox" , "--test-type" ] ) if headless or options.headless: options.headless = True - options.add_argument("--window-size=1920,1080") - options.add_argument("--start-maximized") - options.add_argument("--no-sandbox") + options.add_argument( "--window-size=1920,1080" ) + options.add_argument( "--start-maximized" ) + options.add_argument( "--no-sandbox" ) # fixes "could not connect to chrome" error when running # on linux using privileged user like root (which i don't recommend) - + options.add_argument( "--log-level=%d" % log_level - or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] - ) - - if hasattr(options, "handle_prefs"): - options.handle_prefs(user_data_dir) - + or divmod( logging.getLogger().getEffectiveLevel() , 10 )[ 0 ] + ) + + if hasattr( options , "handle_prefs" ): + options.handle_prefs( user_data_dir ) + # fix exit_type flag to prevent tab-restore nag try: with open( - os.path.join(user_data_dir, "Default/Preferences"), - encoding="latin1", - mode="r+", - ) as fs: - config = json.load(fs) - if config["profile"]["exit_type"] is not None: + os.path.join( user_data_dir , "Default/Preferences" ) , + encoding = "latin1" , + mode = "r+" , + ) as fs: + config = json.load( fs ) + if config[ "profile" ][ "exit_type" ] is not None: # fixing the restore-tabs-nag - config["profile"]["exit_type"] = None - fs.seek(0, 0) - json.dump(config, fs) + config[ "profile" ][ "exit_type" ] = None + fs.seek( 0 , 0 ) + json.dump( config , fs ) fs.truncate() # the file might be shorter - logger.debug("fixed exit_type flag") + logger.debug( "fixed exit_type flag" ) except Exception as e: - logger.debug("did not find a bad exit_type flag ") - + logger.debug( "did not find a bad exit_type flag " ) + self.options = options - + if not desired_capabilities: desired_capabilities = options.to_capabilities() - + if not use_subprocess: self.browser_pid = start_detached( - options.binary_location, *options.arguments - ) + options.binary_location , *options.arguments + ) else: browser = subprocess.Popen( - [options.binary_location, *options.arguments], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=IS_POSIX, - ) + [ options.binary_location , *options.arguments ] , + stdin = subprocess.PIPE , + stdout = subprocess.PIPE , + stderr = subprocess.PIPE , + close_fds = IS_POSIX , + ) self.browser_pid = browser.pid - + if service_creationflags: service = selenium.webdriver.common.service.Service( - self.patcher.executable_path, port, service_args, service_log_path - ) - for attr_name in ("creationflags", "creation_flags"): - if hasattr(service, attr_name): - setattr(service, attr_name, service_creationflags) + self.patcher.executable_path , port , service_args , service_log_path + ) + for attr_name in ("creationflags" , "creation_flags"): + if hasattr( service , attr_name ): + setattr( service , attr_name , service_creationflags ) break else: service = None - - super(Chrome, self).__init__( - executable_path=self.patcher.executable_path, - port=port, - options=options, - service_args=service_args, - desired_capabilities=desired_capabilities, - service_log_path=service_log_path, - keep_alive=keep_alive, - service=service, # needed or the service will be re-created - ) - + + super( Chrome , self ).__init__( + executable_path = self.patcher.executable_path , + port = port , + options = options , + service_args = service_args , + desired_capabilities = desired_capabilities , + service_log_path = service_log_path , + keep_alive = keep_alive , + service = service , # needed or the service will be re-created + ) + self.reactor = None - + if enable_cdp_events: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.getLogger( "selenium.webdriver.remote.remote_connection" - ).setLevel(20) - reactor = Reactor(self) + ).setLevel( 20 ) + reactor = Reactor( self ) reactor.start() self.reactor = reactor - + if advanced_elements: self._web_element_cls = UCWebElement else: self._web_element_cls = WebElement - + if options.headless: self._configure_headless() - - def _configure_headless(self): + + + def _configure_headless( self ): orig_get = self.get - logger.info("setting properties for headless") - - def get_wrapped(*args, **kwargs): - if self.execute_script("return navigator.webdriver"): - logger.info("patch navigator.webdriver") + logger.info( "setting properties for headless" ) + + + def get_wrapped( *args , **kwargs ): + if self.execute_script( "return navigator.webdriver" ): + logger.info( "patch navigator.webdriver" ) self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", + "Page.addScriptToEvaluateOnNewDocument" , { "source": """ - Object.defineProperty(window, 'navigator', { - value: new Proxy(navigator, { - has: (target, key) => (key === 'webdriver' ? false : key in target), - get: (target, key) => - key === 'webdriver' ? - false : - typeof target[key] === 'function' ? - target[key].bind(target) : - target[key] - }) - }); - + Object.defineProperty(window, "navigator", { + Object.defineProperty(window, "navigator", { + value: new Proxy(navigator, { + has: (target, key) => (key === "webdriver" ? false : key in target), + get: (target, key) => + key === "webdriver" + ? false + : typeof target[key] === "function" + ? target[key].bind(target) + : target[key], + }), + }); """ - }, - ) - - logger.info("patch user-agent string") + } , + ) + + logger.info( "patch user-agent string" ) self.execute_cdp_cmd( - "Network.setUserAgentOverride", + "Network.setUserAgentOverride" , { "userAgent": self.execute_script( "return navigator.userAgent" - ).replace("Headless", "") - }, - ) + ).replace( "Headless" , "" ) + } , + ) self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", + "Page.addScriptToEvaluateOnNewDocument" , { "source": """ Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1}); @@ -594,12 +597,14 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): // eslint-disable-next-line Function.prototype.toString = functionToString """ - }, - ) - return orig_get(*args, **kwargs) - + } , + ) + return orig_get( *args , **kwargs ) + + self.get = get_wrapped - + + # def _get_cdc_props(self): # return self.execute_script( # """ @@ -628,32 +633,36 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): # """ # }, # ) - - def get(self, url): + + def get( self , url ): # if self._get_cdc_props(): # self._hook_remove_cdc_props() - return super().get(url) - - def add_cdp_listener(self, event_name, callback): + return super().get( url ) + + + def add_cdp_listener( self , event_name , callback ): if ( - self.reactor - and self.reactor is not None - and isinstance(self.reactor, Reactor) + self.reactor + and self.reactor is not None + and isinstance( self.reactor , Reactor ) ): - self.reactor.add_event_handler(event_name, callback) + self.reactor.add_event_handler( event_name , callback ) return self.reactor.handlers return False - - def clear_cdp_listeners(self): - if self.reactor and isinstance(self.reactor, Reactor): + + + def clear_cdp_listeners( self ): + if self.reactor and isinstance( self.reactor , Reactor ): self.reactor.handlers.clear() - - def window_new(self): + + + def window_new( self ): self.execute( - selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"} - ) - - def tab_new(self, url: str): + selenium.webdriver.remote.command.Command.NEW_WINDOW , { "type": "window" } + ) + + + def tab_new( self , url: str ): """ this opens a url in a new tab. apparently, that passes all tests directly! @@ -666,125 +675,135 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): ------- """ - if not hasattr(self, "cdp"): + if not hasattr( self , "cdp" ): from .cdp import CDP - - cdp = CDP(self.options) - cdp.tab_new(url) - - def reconnect(self, timeout=0.1): + + cdp = CDP( self.options ) + cdp.tab_new( url ) + + + def reconnect( self , timeout = 0.1 ): try: self.service.stop() except Exception as e: - logger.debug(e) - time.sleep(timeout) + logger.debug( e ) + time.sleep( timeout ) try: self.service.start() except Exception as e: - logger.debug(e) - + logger.debug( e ) + try: self.start_session() except Exception as e: - logger.debug(e) - - def start_session(self, capabilities=None, browser_profile=None): + logger.debug( e ) + + + def start_session( self , capabilities = None , browser_profile = None ): if not capabilities: capabilities = self.options.to_capabilities() - super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session( - capabilities, browser_profile - ) + super( selenium.webdriver.chrome.webdriver.WebDriver , self ).start_session( + capabilities , browser_profile + ) # super(Chrome, self).start_session(capabilities, browser_profile) - - def quit(self): + + + def quit( self ): try: self.service.process.kill() - logger.debug("webdriver process ended") - except (AttributeError, RuntimeError, OSError): + logger.debug( "webdriver process ended" ) + except (AttributeError , RuntimeError , OSError): pass try: self.reactor.event.set() - logger.debug("shutting down reactor") + logger.debug( "shutting down reactor" ) except AttributeError: pass try: - os.kill(self.browser_pid, 15) - logger.debug("gracefully closed browser") + os.kill( self.browser_pid , 15 ) + logger.debug( "gracefully closed browser" ) except Exception as e: # noqa - logger.debug(e, exc_info=True) + logger.debug( e , exc_info = True ) if ( - hasattr(self, "keep_user_data_dir") - and hasattr(self, "user_data_dir") - and not self.keep_user_data_dir + hasattr( self , "keep_user_data_dir" ) + and hasattr( self , "user_data_dir" ) + and not self.keep_user_data_dir ): - for _ in range(5): + for _ in range( 5 ): try: - shutil.rmtree(self.user_data_dir, ignore_errors=False) + shutil.rmtree( self.user_data_dir , ignore_errors = False ) except FileNotFoundError: pass - except (RuntimeError, OSError, PermissionError) as e: + except (RuntimeError , OSError , PermissionError) as e: logger.debug( "When removing the temp profile, a %s occured: %s\nretrying..." - % (e.__class__.__name__, e) - ) + % (e.__class__.__name__ , e) + ) else: - logger.debug("successfully removed %s" % self.user_data_dir) + logger.debug( "successfully removed %s" % self.user_data_dir ) break - time.sleep(0.1) - + time.sleep( 0.1 ) + # dereference patcher, so patcher can start cleaning up as well. # this must come last, otherwise it will throw 'in use' errors self.patcher = None - - def __getattribute__(self, item): - if not super().__getattribute__("debug"): - return super().__getattribute__(item) + + + def __getattribute__( self , item ): + if not super().__getattribute__( "debug" ): + return super().__getattribute__( item ) else: import inspect - - original = super().__getattribute__(item) - if inspect.ismethod(original) and not inspect.isclass(original): - - def newfunc(*args, **kwargs): + + original = super().__getattribute__( item ) + if inspect.ismethod( original ) and not inspect.isclass( original ): + def newfunc( *args , **kwargs ): logger.debug( "calling %s with args %s and kwargs %s\n" - % (original.__qualname__, args, kwargs) - ) - return original(*args, **kwargs) - + % (original.__qualname__ , args , kwargs) + ) + return original( *args , **kwargs ) + + return newfunc return original - - def __enter__(self): + + + def __enter__( self ): return self - - def __exit__(self, exc_type, exc_val, exc_tb): + + + def __exit__( self , exc_type , exc_val , exc_tb ): self.service.stop() - time.sleep(self._delay) + time.sleep( self._delay ) self.service.start() self.start_session() - - def __hash__(self): - return hash(self.options.debugger_address) - - def __dir__(self): - return object.__dir__(self) - - def __del__(self): + + + def __hash__( self ): + return hash( self.options.debugger_address ) + + + def __dir__( self ): + return object.__dir__( self ) + + + def __del__( self ): try: self.service.process.kill() except: # noqa pass self.quit() - + + @classmethod - def _ensure_close(cls, self): + def _ensure_close( cls , self ): # needs to be a classmethod so finalize can find the reference - logger.info("ensuring close") + logger.info( "ensuring close" ) if ( - hasattr(self, "service") - and hasattr(self.service, "process") - and hasattr(self.service.process, "kill") + hasattr( self , "service" ) + and hasattr( self.service , "process" ) + and hasattr( self.service.process , "kill" ) ): self.service.process.kill() @@ -801,34 +820,34 @@ def find_chrome_executable(): """ candidates = set() if IS_POSIX: - for item in os.environ.get("PATH").split(os.pathsep): + for item in os.environ.get( "PATH" ).split( os.pathsep ): for subitem in ( - "google-chrome", - "chromium", - "chromium-browser", - "chrome", - "google-chrome-stable", - ): - candidates.add(os.sep.join((item, subitem))) + "google-chrome" , + "chromium" , + "chromium-browser" , + "chrome" , + "google-chrome-stable" , + ): + candidates.add( os.sep.join( (item , subitem) ) ) if "darwin" in sys.platform: candidates.update( [ - "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", - "/Applications/Chromium.app/Contents/MacOS/Chromium", - ] - ) + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" , + "/Applications/Chromium.app/Contents/MacOS/Chromium" , + ] + ) else: for item in map( - os.environ.get, - ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"), - ): + os.environ.get , + ("PROGRAMFILES" , "PROGRAMFILES(X86)" , "LOCALAPPDATA" , "PROGRAMW6432") , + ): if item is not None: for subitem in ( - "Google/Chrome/Application", - "Google/Chrome Beta/Application", - "Google/Chrome Canary/Application", - ): - candidates.add(os.sep.join((item, subitem, "chrome.exe"))) + "Google/Chrome/Application" , + "Google/Chrome Beta/Application" , + "Google/Chrome Canary/Application" , + ): + candidates.add( os.sep.join( (item , subitem , "chrome.exe") ) ) for candidate in candidates: - if os.path.exists(candidate) and os.access(candidate, os.X_OK): - return os.path.normpath(candidate) + if os.path.exists( candidate ) and os.access( candidate , os.X_OK ): + return os.path.normpath( candidate ) diff --git a/undetected_chromedriver/patcher.py b/undetected_chromedriver/patcher.py index 22396d2..b8c227b 100644 --- a/undetected_chromedriver/patcher.py +++ b/undetected_chromedriver/patcher.py @@ -217,46 +217,49 @@ class Patcher(object): def is_binary_patched(self, executable_path=None): executable_path = executable_path or self.executable_path - with io.open(executable_path, "rb") as fh: - return fh.read().find(b"undetected chromedriver") != -1 - + try: + with io.open(executable_path, "rb") as fh: + return fh.read().find(b"undetected chromedriver") != -1 + except FileNotFoundError: + return False + def patch_exe(self): start = time.perf_counter() logger.info("patching driver executable %s" % self.executable_path) with io.open(self.executable_path, "r+b") as fh: - content = fh.read() - match_injected_codeblock = re.search(rb"{window.*;}", content) - if match_injected_codeblock: - target_bytes = match_injected_codeblock[0] - new_target_bytes = ( - b'{console.log("undetected chromedriver 1337!")}'.ljust( - len(target_bytes), b" " - ) - ) - new_content = content.replace(target_bytes, new_target_bytes) - if new_content == content: - logger.warning( - "something went wrong patching the driver binary. could not find injection code block" - ) - else: - logger.debug( - "found block:\n%s\nreplacing with:\n%s" - % (target_bytes, new_target_bytes) - ) - fh.seek(0) - fh.write(new_content) + #content = fh.read() + #match_injected_codeblock = re.search(rb"{window.*;}", content) + #if match_injected_codeblock: + # target_bytes = match_injected_codeblock[0] + # new_target_bytes = ( + # b'{console.log("undetected chromedriver 1337!")}'.ljust( + # len(target_bytes), b" " + # ) + # ) + # new_content = content.replace(target_bytes, new_target_bytes) + # if new_content == content: + # logger.warning( + # "something went wrong patching the driver binary. could not find injection code block" + # ) + # else: + # logger.debug( + # "found block:\n%s\nreplacing with:\n%s" + # % (target_bytes, new_target_bytes) + # ) + # fh.seek(0) + # fh.write(new_content) - # we just keep the cdc variables as they can't be injected anyways so no harm - # keeping for reference - # fh.seek(0) - # for line in iter( lambda: fh.readline() , b"" ): - # if b'cdc_' in line: - # fh.seek( -len( line ) , 1 ) - # new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line ) - # logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) ) - # fh.write( new_line ) - else: - logger.info("%s seems already patched ?!?!" % self.executable_path) + # we just keep the cdc variables as they can't be injected anyways so no harm + # keeping for reference + # fh.seek(0) + for line in iter( lambda: fh.readline() , b"" ): + if b'cdc_' in line: + fh.seek( -len( line ) , 1 ) + new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line ) + logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) ) + fh.write( new_line ) + # else: + # logger.info("%s seems already patched ?!?!" % self.executable_path) logger.debug( "patching took us {:.2f} seconds".format(time.perf_counter() - start) ) From d3fe33fcebccdbb0ccea9d7818fcbfe3027f9391 Mon Sep 17 00:00:00 2001 From: UltrafunkAmsterdam Date: Wed, 8 Feb 2023 01:27:50 +0100 Subject: [PATCH 2/2] 3.4.4 - Fixed --- undetected_chromedriver/__init__.py | 579 ++++++++++++++-------------- undetected_chromedriver/patcher.py | 64 ++- 2 files changed, 303 insertions(+), 340 deletions(-) diff --git a/undetected_chromedriver/__init__.py b/undetected_chromedriver/__init__.py index 792a051..520106d 100644 --- a/undetected_chromedriver/__init__.py +++ b/undetected_chromedriver/__init__.py @@ -17,7 +17,7 @@ by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) from __future__ import annotations -__version__ = "3.4.2" +__version__ = "3.4.4" import json import logging @@ -48,19 +48,19 @@ from .webelement import WebElement __all__ = ( - "Chrome" , - "ChromeOptions" , - "Patcher" , - "Reactor" , - "CDP" , - "find_chrome_executable" , - ) + "Chrome", + "ChromeOptions", + "Patcher", + "Reactor", + "CDP", + "find_chrome_executable", +) -logger = logging.getLogger( "uc" ) -logger.setLevel( logging.getLogger().getEffectiveLevel() ) +logger = logging.getLogger("uc") +logger.setLevel(logging.getLogger().getEffectiveLevel()) -class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): +class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): """ Controls the ChromeDriver and allows you to drive the browser. @@ -96,36 +96,35 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): -------------------------------------------------------------------------- """ - + _instances = set() session_id = None debug = False - - + def __init__( - self , - options = None , - user_data_dir = None , - driver_executable_path = None , - browser_executable_path = None , - port = 0 , - enable_cdp_events = False , - service_args = None , - service_creationflags = None , - desired_capabilities = None , - advanced_elements = False , - service_log_path = None , - keep_alive = True , - log_level = 0 , - headless = False , - version_main = None , - patcher_force_close = False , - suppress_welcome = True , - use_subprocess = True , - debug = False , - no_sandbox = True , - **kw , - ): + self, + options=None, + user_data_dir=None, + driver_executable_path=None, + browser_executable_path=None, + port=0, + enable_cdp_events=False, + service_args=None, + service_creationflags=None, + desired_capabilities=None, + advanced_elements=False, + service_log_path=None, + keep_alive=True, + log_level=0, + headless=False, + version_main=None, + patcher_force_close=False, + suppress_welcome=True, + use_subprocess=True, + debug=False, + no_sandbox=True, + **kw, + ): """ Creates a new instance of the chrome driver. @@ -236,30 +235,30 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): this option has a default of True since many people seem to run this as root (....) , and chrome does not start when running as root without using --no-sandbox flag. """ - - finalize( self , self._ensure_close , self ) + + finalize(self, self._ensure_close, self) self.debug = debug self.patcher = Patcher( - executable_path = driver_executable_path , - force = patcher_force_close , - version_main = version_main , - ) + executable_path=driver_executable_path, + force=patcher_force_close, + version_main=version_main, + ) self.patcher.auto() # self.patcher = patcher if not options: options = ChromeOptions() - + try: - if hasattr( options , "_session" ) and options._session is not None: + if hasattr(options, "_session") and options._session is not None: # prevent reuse of options, # as it just appends arguments, not replace them # you'll get conflicts starting chrome - raise RuntimeError( "you cannot reuse the ChromeOptions object" ) + raise RuntimeError("you cannot reuse the ChromeOptions object") except AttributeError: pass - + options._session = self - + if not options.debugger_address: debug_port = ( port @@ -267,209 +266,207 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): else selenium.webdriver.common.service.utils.free_port() ) debug_host = "127.0.0.1" - options.debugger_address = "%s:%d" % (debug_host , debug_port) + options.debugger_address = "%s:%d" % (debug_host, debug_port) else: - debug_host , debug_port = options.debugger_address.split( ":" ) - debug_port = int( debug_port ) - + debug_host, debug_port = options.debugger_address.split(":") + debug_port = int(debug_port) + if enable_cdp_events: options.set_capability( - "goog:loggingPrefs" , { "performance": "ALL" , "browser": "ALL" } - ) - - options.add_argument( "--remote-debugging-host=%s" % debug_host ) - options.add_argument( "--remote-debugging-port=%s" % debug_port ) - + "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"} + ) + + options.add_argument("--remote-debugging-host=%s" % debug_host) + options.add_argument("--remote-debugging-port=%s" % debug_port) + if user_data_dir: - options.add_argument( "--user-data-dir=%s" % user_data_dir ) - - language , keep_user_data_dir = None , bool( user_data_dir ) - + options.add_argument("--user-data-dir=%s" % user_data_dir) + + language, keep_user_data_dir = None, bool(user_data_dir) + # see if a custom user profile is specified in options for arg in options.arguments: if "lang" in arg: - m = re.search( "(?:--)?lang(?:[ =])?(.*)" , arg ) + m = re.search("(?:--)?lang(?:[ =])?(.*)", arg) try: - language = m[ 1 ] + language = m[1] except IndexError: - logger.debug( "will set the language to en-US,en;q=0.9" ) + logger.debug("will set the language to en-US,en;q=0.9") language = "en-US,en;q=0.9" - + if "user-data-dir" in arg: - m = re.search( "(?:--)?user-data-dir(?:[ =])?(.*)" , arg ) + m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) try: - user_data_dir = m[ 1 ] + user_data_dir = m[1] logger.debug( - "user-data-dir found in user argument %s => %s" % (arg , m[ 1 ]) - ) + "user-data-dir found in user argument %s => %s" % (arg, m[1]) + ) keep_user_data_dir = True - + except IndexError: logger.debug( "no user data dir could be extracted from supplied argument %s " % arg - ) - + ) + if not user_data_dir: # backward compatiblity # check if an old uc.ChromeOptions is used, and extract the user data dir - - if hasattr( options , "user_data_dir" ) and getattr( - options , "user_data_dir" , None - ): + + if hasattr(options, "user_data_dir") and getattr( + options, "user_data_dir", None + ): import warnings - + warnings.warn( "using ChromeOptions.user_data_dir might stop working in future versions." "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" - ) - options.add_argument( "--user-data-dir=%s" % options.user_data_dir ) + ) + options.add_argument("--user-data-dir=%s" % options.user_data_dir) keep_user_data_dir = True logger.debug( "user_data_dir property found in options object: %s" % user_data_dir - ) - + ) + else: - user_data_dir = os.path.normpath( tempfile.mkdtemp() ) + user_data_dir = os.path.normpath(tempfile.mkdtemp()) keep_user_data_dir = False arg = "--user-data-dir=%s" % user_data_dir - options.add_argument( arg ) + options.add_argument(arg) logger.debug( "created a temporary folder in which the user-data (profile) will be stored during this\n" "session, and added it to chrome startup arguments: %s" % arg - ) - + ) + if not language: try: import locale - - language = locale.getdefaultlocale()[ 0 ].replace( "_" , "-" ) + + language = locale.getdefaultlocale()[0].replace("_", "-") except Exception: pass if not language: language = "en-US" - - options.add_argument( "--lang=%s" % language ) - + + options.add_argument("--lang=%s" % language) + if not options.binary_location: options.binary_location = ( - browser_executable_path or find_chrome_executable() + browser_executable_path or find_chrome_executable() ) - + self._delay = 3 - + self.user_data_dir = user_data_dir self.keep_user_data_dir = keep_user_data_dir - + if suppress_welcome: - options.arguments.extend( [ "--no-default-browser-check" , "--no-first-run" ] ) + options.arguments.extend(["--no-default-browser-check", "--no-first-run"]) if no_sandbox: - options.arguments.extend( [ "--no-sandbox" , "--test-type" ] ) + options.arguments.extend(["--no-sandbox", "--test-type"]) if headless or options.headless: options.headless = True - options.add_argument( "--window-size=1920,1080" ) - options.add_argument( "--start-maximized" ) - options.add_argument( "--no-sandbox" ) + options.add_argument("--window-size=1920,1080") + options.add_argument("--start-maximized") + options.add_argument("--no-sandbox") # fixes "could not connect to chrome" error when running # on linux using privileged user like root (which i don't recommend) - + options.add_argument( "--log-level=%d" % log_level - or divmod( logging.getLogger().getEffectiveLevel() , 10 )[ 0 ] - ) - - if hasattr( options , "handle_prefs" ): - options.handle_prefs( user_data_dir ) - + or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] + ) + + if hasattr(options, "handle_prefs"): + options.handle_prefs(user_data_dir) + # fix exit_type flag to prevent tab-restore nag try: with open( - os.path.join( user_data_dir , "Default/Preferences" ) , - encoding = "latin1" , - mode = "r+" , - ) as fs: - config = json.load( fs ) - if config[ "profile" ][ "exit_type" ] is not None: + os.path.join(user_data_dir, "Default/Preferences"), + encoding="latin1", + mode="r+", + ) as fs: + config = json.load(fs) + if config["profile"]["exit_type"] is not None: # fixing the restore-tabs-nag - config[ "profile" ][ "exit_type" ] = None - fs.seek( 0 , 0 ) - json.dump( config , fs ) + config["profile"]["exit_type"] = None + fs.seek(0, 0) + json.dump(config, fs) fs.truncate() # the file might be shorter - logger.debug( "fixed exit_type flag" ) + logger.debug("fixed exit_type flag") except Exception as e: - logger.debug( "did not find a bad exit_type flag " ) - + logger.debug("did not find a bad exit_type flag ") + self.options = options - + if not desired_capabilities: desired_capabilities = options.to_capabilities() - + if not use_subprocess: self.browser_pid = start_detached( - options.binary_location , *options.arguments - ) + options.binary_location, *options.arguments + ) else: browser = subprocess.Popen( - [ options.binary_location , *options.arguments ] , - stdin = subprocess.PIPE , - stdout = subprocess.PIPE , - stderr = subprocess.PIPE , - close_fds = IS_POSIX , - ) + [options.binary_location, *options.arguments], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=IS_POSIX, + ) self.browser_pid = browser.pid - + if service_creationflags: service = selenium.webdriver.common.service.Service( - self.patcher.executable_path , port , service_args , service_log_path - ) - for attr_name in ("creationflags" , "creation_flags"): - if hasattr( service , attr_name ): - setattr( service , attr_name , service_creationflags ) + self.patcher.executable_path, port, service_args, service_log_path + ) + for attr_name in ("creationflags", "creation_flags"): + if hasattr(service, attr_name): + setattr(service, attr_name, service_creationflags) break else: service = None - - super( Chrome , self ).__init__( - executable_path = self.patcher.executable_path , - port = port , - options = options , - service_args = service_args , - desired_capabilities = desired_capabilities , - service_log_path = service_log_path , - keep_alive = keep_alive , - service = service , # needed or the service will be re-created - ) - + + super(Chrome, self).__init__( + executable_path=self.patcher.executable_path, + port=port, + options=options, + service_args=service_args, + desired_capabilities=desired_capabilities, + service_log_path=service_log_path, + keep_alive=keep_alive, + service=service, # needed or the service will be re-created + ) + self.reactor = None - + if enable_cdp_events: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.getLogger( "selenium.webdriver.remote.remote_connection" - ).setLevel( 20 ) - reactor = Reactor( self ) + ).setLevel(20) + reactor = Reactor(self) reactor.start() self.reactor = reactor - + if advanced_elements: self._web_element_cls = UCWebElement else: self._web_element_cls = WebElement - + if options.headless: self._configure_headless() - - - def _configure_headless( self ): + + def _configure_headless(self): orig_get = self.get - logger.info( "setting properties for headless" ) - - - def get_wrapped( *args , **kwargs ): - if self.execute_script( "return navigator.webdriver" ): - logger.info( "patch navigator.webdriver" ) + logger.info("setting properties for headless") + + def get_wrapped(*args, **kwargs): + if self.execute_script("return navigator.webdriver"): + logger.info("patch navigator.webdriver") self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument" , + "Page.addScriptToEvaluateOnNewDocument", { "source": """ @@ -486,20 +483,20 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): }), }); """ - } , - ) - - logger.info( "patch user-agent string" ) + }, + ) + + logger.info("patch user-agent string") self.execute_cdp_cmd( - "Network.setUserAgentOverride" , + "Network.setUserAgentOverride", { "userAgent": self.execute_script( "return navigator.userAgent" - ).replace( "Headless" , "" ) - } , - ) + ).replace("Headless", "") + }, + ) self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument" , + "Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1}); @@ -597,14 +594,12 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): // eslint-disable-next-line Function.prototype.toString = functionToString """ - } , - ) - return orig_get( *args , **kwargs ) - - + }, + ) + return orig_get(*args, **kwargs) + self.get = get_wrapped - - + # def _get_cdc_props(self): # return self.execute_script( # """ @@ -633,36 +628,32 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): # """ # }, # ) - - def get( self , url ): + + def get(self, url): # if self._get_cdc_props(): # self._hook_remove_cdc_props() - return super().get( url ) - - - def add_cdp_listener( self , event_name , callback ): + return super().get(url) + + def add_cdp_listener(self, event_name, callback): if ( - self.reactor - and self.reactor is not None - and isinstance( self.reactor , Reactor ) + self.reactor + and self.reactor is not None + and isinstance(self.reactor, Reactor) ): - self.reactor.add_event_handler( event_name , callback ) + self.reactor.add_event_handler(event_name, callback) return self.reactor.handlers return False - - - def clear_cdp_listeners( self ): - if self.reactor and isinstance( self.reactor , Reactor ): + + def clear_cdp_listeners(self): + if self.reactor and isinstance(self.reactor, Reactor): self.reactor.handlers.clear() - - - def window_new( self ): + + def window_new(self): self.execute( - selenium.webdriver.remote.command.Command.NEW_WINDOW , { "type": "window" } - ) - - - def tab_new( self , url: str ): + selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"} + ) + + def tab_new(self, url: str): """ this opens a url in a new tab. apparently, that passes all tests directly! @@ -675,135 +666,125 @@ class Chrome( selenium.webdriver.chrome.webdriver.WebDriver ): ------- """ - if not hasattr( self , "cdp" ): + if not hasattr(self, "cdp"): from .cdp import CDP - - cdp = CDP( self.options ) - cdp.tab_new( url ) - - - def reconnect( self , timeout = 0.1 ): + + cdp = CDP(self.options) + cdp.tab_new(url) + + def reconnect(self, timeout=0.1): try: self.service.stop() except Exception as e: - logger.debug( e ) - time.sleep( timeout ) + logger.debug(e) + time.sleep(timeout) try: self.service.start() except Exception as e: - logger.debug( e ) - + logger.debug(e) + try: self.start_session() except Exception as e: - logger.debug( e ) - - - def start_session( self , capabilities = None , browser_profile = None ): + logger.debug(e) + + def start_session(self, capabilities=None, browser_profile=None): if not capabilities: capabilities = self.options.to_capabilities() - super( selenium.webdriver.chrome.webdriver.WebDriver , self ).start_session( - capabilities , browser_profile - ) + super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session( + capabilities, browser_profile + ) # super(Chrome, self).start_session(capabilities, browser_profile) - - - def quit( self ): + + def quit(self): try: self.service.process.kill() - logger.debug( "webdriver process ended" ) - except (AttributeError , RuntimeError , OSError): + logger.debug("webdriver process ended") + except (AttributeError, RuntimeError, OSError): pass try: self.reactor.event.set() - logger.debug( "shutting down reactor" ) + logger.debug("shutting down reactor") except AttributeError: pass try: - os.kill( self.browser_pid , 15 ) - logger.debug( "gracefully closed browser" ) + os.kill(self.browser_pid, 15) + logger.debug("gracefully closed browser") except Exception as e: # noqa - logger.debug( e , exc_info = True ) + logger.debug(e, exc_info=True) if ( - hasattr( self , "keep_user_data_dir" ) - and hasattr( self , "user_data_dir" ) - and not self.keep_user_data_dir + hasattr(self, "keep_user_data_dir") + and hasattr(self, "user_data_dir") + and not self.keep_user_data_dir ): - for _ in range( 5 ): + for _ in range(5): try: - shutil.rmtree( self.user_data_dir , ignore_errors = False ) + shutil.rmtree(self.user_data_dir, ignore_errors=False) except FileNotFoundError: pass - except (RuntimeError , OSError , PermissionError) as e: + except (RuntimeError, OSError, PermissionError) as e: logger.debug( "When removing the temp profile, a %s occured: %s\nretrying..." - % (e.__class__.__name__ , e) - ) + % (e.__class__.__name__, e) + ) else: - logger.debug( "successfully removed %s" % self.user_data_dir ) + logger.debug("successfully removed %s" % self.user_data_dir) break - time.sleep( 0.1 ) - + time.sleep(0.1) + # dereference patcher, so patcher can start cleaning up as well. # this must come last, otherwise it will throw 'in use' errors self.patcher = None - - - def __getattribute__( self , item ): - if not super().__getattribute__( "debug" ): - return super().__getattribute__( item ) + + def __getattribute__(self, item): + if not super().__getattribute__("debug"): + return super().__getattribute__(item) else: import inspect - - original = super().__getattribute__( item ) - if inspect.ismethod( original ) and not inspect.isclass( original ): - def newfunc( *args , **kwargs ): + + original = super().__getattribute__(item) + if inspect.ismethod(original) and not inspect.isclass(original): + + def newfunc(*args, **kwargs): logger.debug( "calling %s with args %s and kwargs %s\n" - % (original.__qualname__ , args , kwargs) - ) - return original( *args , **kwargs ) - - + % (original.__qualname__, args, kwargs) + ) + return original(*args, **kwargs) + return newfunc return original - - - def __enter__( self ): + + def __enter__(self): return self - - - def __exit__( self , exc_type , exc_val , exc_tb ): + + def __exit__(self, exc_type, exc_val, exc_tb): self.service.stop() - time.sleep( self._delay ) + time.sleep(self._delay) self.service.start() self.start_session() - - - def __hash__( self ): - return hash( self.options.debugger_address ) - - - def __dir__( self ): - return object.__dir__( self ) - - - def __del__( self ): + + def __hash__(self): + return hash(self.options.debugger_address) + + def __dir__(self): + return object.__dir__(self) + + def __del__(self): try: self.service.process.kill() except: # noqa pass self.quit() - - + @classmethod - def _ensure_close( cls , self ): + def _ensure_close(cls, self): # needs to be a classmethod so finalize can find the reference - logger.info( "ensuring close" ) + logger.info("ensuring close") if ( - hasattr( self , "service" ) - and hasattr( self.service , "process" ) - and hasattr( self.service.process , "kill" ) + hasattr(self, "service") + and hasattr(self.service, "process") + and hasattr(self.service.process, "kill") ): self.service.process.kill() @@ -820,34 +801,34 @@ def find_chrome_executable(): """ candidates = set() if IS_POSIX: - for item in os.environ.get( "PATH" ).split( os.pathsep ): + for item in os.environ.get("PATH").split(os.pathsep): for subitem in ( - "google-chrome" , - "chromium" , - "chromium-browser" , - "chrome" , - "google-chrome-stable" , - ): - candidates.add( os.sep.join( (item , subitem) ) ) + "google-chrome", + "chromium", + "chromium-browser", + "chrome", + "google-chrome-stable", + ): + candidates.add(os.sep.join((item, subitem))) if "darwin" in sys.platform: candidates.update( [ - "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" , - "/Applications/Chromium.app/Contents/MacOS/Chromium" , - ] - ) + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "/Applications/Chromium.app/Contents/MacOS/Chromium", + ] + ) else: for item in map( - os.environ.get , - ("PROGRAMFILES" , "PROGRAMFILES(X86)" , "LOCALAPPDATA" , "PROGRAMW6432") , - ): + os.environ.get, + ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"), + ): if item is not None: for subitem in ( - "Google/Chrome/Application" , - "Google/Chrome Beta/Application" , - "Google/Chrome Canary/Application" , - ): - candidates.add( os.sep.join( (item , subitem , "chrome.exe") ) ) + "Google/Chrome/Application", + "Google/Chrome Beta/Application", + "Google/Chrome Canary/Application", + ): + candidates.add(os.sep.join((item, subitem, "chrome.exe"))) for candidate in candidates: - if os.path.exists( candidate ) and os.access( candidate , os.X_OK ): - return os.path.normpath( candidate ) + if os.path.exists(candidate) and os.access(candidate, os.X_OK): + return os.path.normpath(candidate) diff --git a/undetected_chromedriver/patcher.py b/undetected_chromedriver/patcher.py index b8c227b..af87e5e 100644 --- a/undetected_chromedriver/patcher.py +++ b/undetected_chromedriver/patcher.py @@ -94,9 +94,7 @@ class Patcher(object): self.version_full = None def auto(self, executable_path=None, force=False, version_main=None): - if executable_path: - self.executable_path = executable_path self._custom_exe_path = True @@ -207,12 +205,7 @@ class Patcher(object): @staticmethod def gen_random_cdc(): - # make cdc_variables without underscores cdc = random.choices(string.ascii_letters, k=27) - - # cdc[-6:-4] = map(str.upper, cdc[-6:-4]) - # cdc[2] = cdc[0] - # cdc[3] = "_" return "".join(cdc).encode() def is_binary_patched(self, executable_path=None): @@ -222,44 +215,33 @@ class Patcher(object): return fh.read().find(b"undetected chromedriver") != -1 except FileNotFoundError: return False - + def patch_exe(self): start = time.perf_counter() logger.info("patching driver executable %s" % self.executable_path) with io.open(self.executable_path, "r+b") as fh: - #content = fh.read() - #match_injected_codeblock = re.search(rb"{window.*;}", content) - #if match_injected_codeblock: - # target_bytes = match_injected_codeblock[0] - # new_target_bytes = ( - # b'{console.log("undetected chromedriver 1337!")}'.ljust( - # len(target_bytes), b" " - # ) - # ) - # new_content = content.replace(target_bytes, new_target_bytes) - # if new_content == content: - # logger.warning( - # "something went wrong patching the driver binary. could not find injection code block" - # ) - # else: - # logger.debug( - # "found block:\n%s\nreplacing with:\n%s" - # % (target_bytes, new_target_bytes) - # ) - # fh.seek(0) - # fh.write(new_content) - - # we just keep the cdc variables as they can't be injected anyways so no harm - # keeping for reference - # fh.seek(0) - for line in iter( lambda: fh.readline() , b"" ): - if b'cdc_' in line: - fh.seek( -len( line ) , 1 ) - new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line ) - logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) ) - fh.write( new_line ) - # else: - # logger.info("%s seems already patched ?!?!" % self.executable_path) + content = fh.read() + # match_injected_codeblock = re.search(rb"{window.*;}", content) + match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content) + if match_injected_codeblock: + target_bytes = match_injected_codeblock[0] + new_target_bytes = ( + b'{console.log("undetected chromedriver 1337!")}'.ljust( + len(target_bytes), b" " + ) + ) + new_content = content.replace(target_bytes, new_target_bytes) + if new_content == content: + logger.warning( + "something went wrong patching the driver binary. could not find injection code block" + ) + else: + logger.debug( + "found block:\n%s\nreplacing with:\n%s" + % (target_bytes, new_target_bytes) + ) + fh.seek(0) + fh.write(new_content) logger.debug( "patching took us {:.2f} seconds".format(time.perf_counter() - start) )