fix for linux find_elements: SyntaxError: missing ) after argument list

This commit is contained in:
UltrafunkAmsterdam 2023-02-05 18:37:28 +01:00
parent 8baa77352f
commit 305803ca95
2 changed files with 348 additions and 326 deletions

View File

@ -101,6 +101,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
session_id = None session_id = None
debug = False debug = False
def __init__( def __init__(
self , self ,
options = None , options = None ,
@ -458,10 +459,12 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
if options.headless: if options.headless:
self._configure_headless() self._configure_headless()
def _configure_headless( self ): def _configure_headless( self ):
orig_get = self.get orig_get = self.get
logger.info( "setting properties for headless" ) logger.info( "setting properties for headless" )
def get_wrapped( *args , **kwargs ): def get_wrapped( *args , **kwargs ):
if self.execute_script( "return navigator.webdriver" ): if self.execute_script( "return navigator.webdriver" ):
logger.info( "patch navigator.webdriver" ) logger.info( "patch navigator.webdriver" )
@ -470,18 +473,18 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
{ {
"source": """ "source": """
Object.defineProperty(window, 'navigator', { Object.defineProperty(window, "navigator", {
Object.defineProperty(window, "navigator", {
value: new Proxy(navigator, { value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target), has: (target, key) => (key === "webdriver" ? false : key in target),
get: (target, key) => get: (target, key) =>
key === 'webdriver' ? key === "webdriver"
false : ? false
typeof target[key] === 'function' ? : typeof target[key] === "function"
target[key].bind(target) : ? target[key].bind(target)
target[key] : target[key],
}) }),
}); });
""" """
} , } ,
) )
@ -598,8 +601,10 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
) )
return orig_get( *args , **kwargs ) return orig_get( *args , **kwargs )
self.get = get_wrapped self.get = get_wrapped
# def _get_cdc_props(self): # def _get_cdc_props(self):
# return self.execute_script( # return self.execute_script(
# """ # """
@ -634,6 +639,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
# self._hook_remove_cdc_props() # self._hook_remove_cdc_props()
return super().get( url ) return super().get( url )
def add_cdp_listener( self , event_name , callback ): def add_cdp_listener( self , event_name , callback ):
if ( if (
self.reactor self.reactor
@ -644,15 +650,18 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
return self.reactor.handlers return self.reactor.handlers
return False return False
def clear_cdp_listeners( self ): def clear_cdp_listeners( self ):
if self.reactor and isinstance( self.reactor , Reactor ): if self.reactor and isinstance( self.reactor , Reactor ):
self.reactor.handlers.clear() self.reactor.handlers.clear()
def window_new( self ): def window_new( self ):
self.execute( self.execute(
selenium.webdriver.remote.command.Command.NEW_WINDOW , { "type": "window" } selenium.webdriver.remote.command.Command.NEW_WINDOW , { "type": "window" }
) )
def tab_new( self , url: str ): def tab_new( self , url: str ):
""" """
this opens a url in a new tab. this opens a url in a new tab.
@ -672,6 +681,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
cdp = CDP( self.options ) cdp = CDP( self.options )
cdp.tab_new( url ) cdp.tab_new( url )
def reconnect( self , timeout = 0.1 ): def reconnect( self , timeout = 0.1 ):
try: try:
self.service.stop() self.service.stop()
@ -688,6 +698,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
except Exception as e: except Exception as e:
logger.debug( e ) logger.debug( e )
def start_session( self , capabilities = None , browser_profile = None ): def start_session( self , capabilities = None , browser_profile = None ):
if not capabilities: if not capabilities:
capabilities = self.options.to_capabilities() capabilities = self.options.to_capabilities()
@ -696,6 +707,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
) )
# super(Chrome, self).start_session(capabilities, browser_profile) # super(Chrome, self).start_session(capabilities, browser_profile)
def quit( self ): def quit( self ):
try: try:
self.service.process.kill() self.service.process.kill()
@ -736,6 +748,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
# this must come last, otherwise it will throw 'in use' errors # this must come last, otherwise it will throw 'in use' errors
self.patcher = None self.patcher = None
def __getattribute__( self , item ): def __getattribute__( self , item ):
if not super().__getattribute__( "debug" ): if not super().__getattribute__( "debug" ):
return super().__getattribute__( item ) return super().__getattribute__( item )
@ -744,7 +757,6 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
original = super().__getattribute__( item ) original = super().__getattribute__( item )
if inspect.ismethod( original ) and not inspect.isclass( original ): if inspect.ismethod( original ) and not inspect.isclass( original ):
def newfunc( *args , **kwargs ): def newfunc( *args , **kwargs ):
logger.debug( logger.debug(
"calling %s with args %s and kwargs %s\n" "calling %s with args %s and kwargs %s\n"
@ -752,24 +764,30 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
) )
return original( *args , **kwargs ) return original( *args , **kwargs )
return newfunc return newfunc
return original return original
def __enter__( self ): def __enter__( self ):
return self return self
def __exit__( self , exc_type , exc_val , exc_tb ): def __exit__( self , exc_type , exc_val , exc_tb ):
self.service.stop() self.service.stop()
time.sleep( self._delay ) time.sleep( self._delay )
self.service.start() self.service.start()
self.start_session() self.start_session()
def __hash__( self ): def __hash__( self ):
return hash( self.options.debugger_address ) return hash( self.options.debugger_address )
def __dir__( self ): def __dir__( self ):
return object.__dir__( self ) return object.__dir__( self )
def __del__( self ): def __del__( self ):
try: try:
self.service.process.kill() self.service.process.kill()
@ -777,6 +795,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
pass pass
self.quit() self.quit()
@classmethod @classmethod
def _ensure_close( cls , self ): def _ensure_close( cls , self ):
# needs to be a classmethod so finalize can find the reference # needs to be a classmethod so finalize can find the reference

View File

@ -217,46 +217,49 @@ class Patcher(object):
def is_binary_patched(self, executable_path=None): def is_binary_patched(self, executable_path=None):
executable_path = executable_path or self.executable_path executable_path = executable_path or self.executable_path
try:
with io.open(executable_path, "rb") as fh: with io.open(executable_path, "rb") as fh:
return fh.read().find(b"undetected chromedriver") != -1 return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
return False
def patch_exe(self): def patch_exe(self):
start = time.perf_counter() start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path) logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh: with io.open(self.executable_path, "r+b") as fh:
content = fh.read() #content = fh.read()
match_injected_codeblock = re.search(rb"{window.*;}", content) #match_injected_codeblock = re.search(rb"{window.*;}", content)
if match_injected_codeblock: #if match_injected_codeblock:
target_bytes = match_injected_codeblock[0] # target_bytes = match_injected_codeblock[0]
new_target_bytes = ( # new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust( # b'{console.log("undetected chromedriver 1337!")}'.ljust(
len(target_bytes), b" " # len(target_bytes), b" "
) # )
) # )
new_content = content.replace(target_bytes, new_target_bytes) # new_content = content.replace(target_bytes, new_target_bytes)
if new_content == content: # if new_content == content:
logger.warning( # logger.warning(
"something went wrong patching the driver binary. could not find injection code block" # "something went wrong patching the driver binary. could not find injection code block"
) # )
else: # else:
logger.debug( # logger.debug(
"found block:\n%s\nreplacing with:\n%s" # "found block:\n%s\nreplacing with:\n%s"
% (target_bytes, new_target_bytes) # % (target_bytes, new_target_bytes)
) # )
fh.seek(0) # fh.seek(0)
fh.write(new_content) # fh.write(new_content)
# we just keep the cdc variables as they can't be injected anyways so no harm # we just keep the cdc variables as they can't be injected anyways so no harm
# keeping for reference # keeping for reference
# fh.seek(0) # fh.seek(0)
# for line in iter( lambda: fh.readline() , b"" ): for line in iter( lambda: fh.readline() , b"" ):
# if b'cdc_' in line: if b'cdc_' in line:
# fh.seek( -len( line ) , 1 ) fh.seek( -len( line ) , 1 )
# new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line ) new_line = re.sub( b"cdc_.{22}_" , self.gen_random_cdc() , line )
# logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) ) logger.debug( 'replaced %s\n\twith:%s' % (line , new_line) )
# fh.write( new_line ) fh.write( new_line )
else: # else:
logger.info("%s seems already patched ?!?!" % self.executable_path) # logger.info("%s seems already patched ?!?!" % self.executable_path)
logger.debug( logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start) "patching took us {:.2f} seconds".format(time.perf_counter() - start)
) )