Merge branch 'master' into master

This commit is contained in:
kaliiiiiiiiiii 2023-06-07 07:08:33 +02:00 committed by GitHub
commit ac068c0d29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 288 additions and 23 deletions

51
.github/workflows/workflow.yml vendored Normal file
View File

@ -0,0 +1,51 @@
name: Python package
on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10","3.11"]
steps:
- uses: actions/checkout@v3
- name: Setup Chrome
uses: browser-actions/setup-chrome@v1.2.0
with:
chrome-version: stable
- name: set chrome in path
run: |
echo "/opt/hostedtoolcache/chromium/stable/x64" >> $GITHUB_PATH
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install package
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; else pip install -U . ; fi
- name: run example
run: |
python example/test_workflow.py
- name: Upload a Build Artifact
uses: actions/upload-artifact@v3.1.2
with:
# Artifact name
name: screenshots
# A file, directory or wildcard pattern that describes what to upload
path: /home/runner/work/_temp/*p*

View File

@ -1,10 +1,13 @@
import time import time
import logging
logging.basicConfig(level=10)
from selenium.common.exceptions import WebDriverException from selenium.common.exceptions import WebDriverException
from selenium.webdriver.remote.webdriver import By from selenium.webdriver.remote.webdriver import By
import selenium.webdriver.support.expected_conditions as EC # noqa import selenium.webdriver.support.expected_conditions as EC # noqa
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
import undetected_chromedriver as uc import undetected_chromedriver as uc
@ -164,7 +167,8 @@ def main(args=None):
print("lets go to UC project page") print("lets go to UC project page")
driver.get("https://www.github.com/ultrafunkamsterdam/undetected-chromedriver") driver.get("https://www.github.com/ultrafunkamsterdam/undetected-chromedriver")
input("press a key if you have RTFM")
sleep(2)
driver.quit() driver.quit()

121
example/test_workflow.py Normal file
View File

@ -0,0 +1,121 @@
# coding: utf-8
import time
import logging
import os
from selenium.webdriver.support.wait import WebDriverWait
import selenium.webdriver.support.expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import undetected_chromedriver as uc
from pathlib import Path
logging.basicConfig(level=10)
logger = logging.getLogger('test')
def main():
####
# this block is a dirty helper since
# in the action runner devices serveral chrome versions exists
# and i need to ensure it takes the one which is installed
# by the task.
####
for k,v in os.environ.items():
logger.info("%s = %s" % (k,v))
logger.info('==== END ENV ==== ')
tmp = Path('/tmp').resolve()
for item in tmp.rglob('**'):
logger.info('found %s ' % item)
if item.is_dir():
if 'chrome-' in item.name:
logger.info('adding %s to PATH' % str(item))
logger.info('current PATH: %s' % str(os.environ.get('PATH')))
path_list = os.environ['PATH'].split(os.pathsep)
path_list.insert(0, str(item))
os.environ['PATH'] = os.pathsep.join(path_list)
logger.info('new PATH %s:' % str(os.environ.get('PATH')))
browser_executable_path = str(item / 'chrome')
break
####
# test really starts here
#3##
driver = uc.Chrome(headless=True, browser_executable_path=browser_executable_path)
logging.getLogger().setLevel(10)
driver.get('chrome://version')
driver.save_screenshot('/home/runner/work/_temp/versioninfo.png')
driver.get('chrome://settings/help')
driver.save_screenshot('/home/runner/work/_temp/helpinfo.png')
driver.get('https://www.google.com')
driver.save_screenshot('/home/runner/work/_temp/google.com.png')
driver.get('https://bot.incolumitas.com/#botChallenge')
pdfdata = driver.execute_cdp_cmd('Page.printToPDF', {})
if pdfdata:
if 'data' in pdfdata:
data = pdfdata['data']
import base64
buffer = base64.b64decode(data)
with open('/home/runner/work/_temp/report.pdf', 'w+b') as f:
f.write(buffer)
driver.get('https://www.nowsecure.nl')
logger.info('current url %s' % driver.current_url)
try:
WebDriverWait(driver,15).until(EC.title_contains('moment'))
except TimeoutException:
pass
logger.info('current page source:\n%s' % driver.page_source)
logger.info('current url %s' % driver.current_url)
try:
WebDriverWait(driver,15).until(EC.title_contains('nowSecure'))
logger.info('PASSED CLOUDFLARE!')
except TimeoutException:
logger.info('timeout')
print(driver.current_url)
logger.info('current page source:\n%s\n' % driver.page_source)
#logger.info('trying to save a screenshot via imgur')
driver.save_screenshot('/home/runner/work/_temp/nowsecure.png')
#driver.get('https://imgur.com/upload')
#driver.find_element('css selector', 'input').send_keys('/home/runner/work/_temp/nowsecure.png')
#time.sleep(1)
#logger.info('current url %s' % driver.current_url)
#time.sleep(1)
#logger.info(f'A SCREENSHOT IS SAVED ON {driver.current_url} <<< if this ends onlywith /upload than it failed. after all we are running from a datacenter no human being would ever surf the internet from ')
#time.sleep(5)
driver.quit()
if __name__ == "__main__":
main()

View File

@ -17,7 +17,7 @@ by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
from __future__ import annotations from __future__ import annotations
__version__ = "3.4.6" __version__ = "3.4.7"
import json import json
import logging import logging
@ -123,6 +123,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
use_subprocess=True, use_subprocess=True,
debug=False, debug=False,
no_sandbox=True, no_sandbox=True,
user_multi_procs: bool = False,
**kw, **kw,
): ):
""" """
@ -234,6 +235,14 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar
this option has a default of True since many people seem to run this as root (....) , and chrome does not start this option has a default of True since many people seem to run this as root (....) , and chrome does not start
when running as root without using --no-sandbox flag. when running as root without using --no-sandbox flag.
user_multi_procs:
set to true when you are using multithreads/multiprocessing
ensures not all processes are trying to modify a binary which is in use by another.
for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER.
this requirement can be easily satisfied, by just running this program "normal" and close/kill it.
""" """
finalize(self, self._ensure_close, self) finalize(self, self._ensure_close, self)
@ -242,8 +251,11 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
executable_path=driver_executable_path, executable_path=driver_executable_path,
force=patcher_force_close, force=patcher_force_close,
version_main=version_main, version_main=version_main,
user_multi_procs=user_multi_procs,
) )
# self.patcher.auto(user_multiprocess = user_multi_num_procs)
self.patcher.auto() self.patcher.auto()
# self.patcher = patcher # self.patcher = patcher
if not options: if not options:
options = ChromeOptions() options = ChromeOptions()
@ -383,10 +395,12 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
options.arguments.extend(["--no-sandbox", "--test-type"]) options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or options.headless: if headless or options.headless:
if self.patcher.version_main < 108: #workaround until a better checking is found
options.add_argument("--headless=chrome") options.add_argument("--headless=new")
elif self.patcher.version_main >= 108: #if self.patcher.version_main < 108:
options.add_argument("--headless=new") # options.add_argument("--headless=chrome")
#elif self.patcher.version_main >= 108:
if not keep_window_size: if not keep_window_size:
options.add_argument("--window-size=1920,1080") options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized") options.add_argument("--start-maximized")
@ -733,7 +747,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
os.kill(self.browser_pid, 15) os.kill(self.browser_pid, 15)
logger.debug("gracefully closed browser") logger.debug("gracefully closed browser")
except Exception as e: # noqa except Exception as e: # noqa
logger.debug(e, exc_info=True) pass
if ( if (
hasattr(self, "keep_user_data_dir") hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir") and hasattr(self, "user_data_dir")
@ -852,5 +866,7 @@ def find_chrome_executable():
): ):
candidates.add(os.sep.join((item, subitem, "chrome.exe"))) candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates: for candidate in candidates:
logger.debug('checking if %s exists and is executable' % candidate)
if os.path.exists(candidate) and os.access(candidate, os.X_OK): if os.path.exists(candidate) and os.access(candidate, os.X_OK):
logger.debug('found! using %s' % candidate)
return os.path.normpath(candidate) return os.path.normpath(candidate)

View File

@ -5,15 +5,17 @@ from distutils.version import LooseVersion
import io import io
import logging import logging
import os import os
import pathlib
import random import random
import re import re
import shutil
import string import string
import sys import sys
import time import time
from urllib.request import urlopen from urllib.request import urlopen
from urllib.request import urlretrieve from urllib.request import urlretrieve
import zipfile import zipfile
from multiprocessing import Lock
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,6 +23,7 @@ IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2"))
class Patcher(object): class Patcher(object):
lock = Lock()
url_repo = "https://chromedriver.storage.googleapis.com" url_repo = "https://chromedriver.storage.googleapis.com"
zip_name = "chromedriver_%s.zip" zip_name = "chromedriver_%s.zip"
exe_name = "chromedriver%s" exe_name = "chromedriver%s"
@ -48,7 +51,13 @@ class Patcher(object):
d = "~/.undetected_chromedriver" d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d)) data_path = os.path.abspath(os.path.expanduser(d))
def __init__(self, executable_path=None, force=False, version_main: int = 0): def __init__(
self,
executable_path=None,
force=False,
version_main: int = 0,
user_multi_procs=False,
):
""" """
Args: Args:
executable_path: None = automatic executable_path: None = automatic
@ -61,6 +70,7 @@ class Patcher(object):
self.force = force self.force = force
self._custom_exe_path = False self._custom_exe_path = False
prefix = "undetected" prefix = "undetected"
self.user_multi_procs = user_multi_procs
if not os.path.exists(self.data_path): if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True) os.makedirs(self.data_path, exist_ok=True)
@ -78,17 +88,41 @@ class Patcher(object):
self.zip_path = os.path.join(self.data_path, prefix) self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path: if not executable_path:
self.executable_path = os.path.abspath( if not self.user_multi_procs:
os.path.join(".", self.executable_path) self.executable_path = os.path.abspath(
) os.path.join(".", self.executable_path)
)
if executable_path: if executable_path:
self._custom_exe_path = True self._custom_exe_path = True
self.executable_path = executable_path self.executable_path = executable_path
self.version_main = version_main self.version_main = version_main
self.version_full = None self.version_full = None
def auto(self, executable_path=None, force=False, version_main=None): def auto(self, executable_path=None, force=False, version_main=None, _=None):
"""
Args:
executable_path:
force:
version_main:
Returns:
"""
# if self.user_multi_procs and \
# self.user_multi_procs != -1:
# # -1 being a skip value used later in this block
#
p = pathlib.Path(self.data_path)
with Lock():
files = list(p.rglob("*chromedriver*?"))
for file in files:
if self.is_binary_patched(file):
self.executable_path = str(file)
return True
if executable_path: if executable_path:
self.executable_path = executable_path self.executable_path = executable_path
self._custom_exe_path = True self._custom_exe_path = True
@ -127,6 +161,49 @@ class Patcher(object):
self.unzip_package(self.fetch_package()) self.unzip_package(self.fetch_package())
return self.patch() return self.patch()
def driver_binary_in_use(self, path: str = None) -> bool:
"""
naive test to check if a found chromedriver binary is
currently in use
Args:
path: a string or PathLike object to the binary to check.
if not specified, we check use this object's executable_path
"""
if not path:
path = self.executable_path
p = pathlib.Path(path)
if not p.exists():
raise OSError("file does not exist: %s" % p)
try:
with open(p, mode="a+b") as fs:
exc = []
try:
fs.seek(0, 0)
except PermissionError as e:
exc.append(e) # since some systems apprently allow seeking
# we conduct another test
try:
fs.readline()
except PermissionError as e:
exc.append(e)
if exc:
return True
return False
# ok safe to assume this is in use
except Exception as e:
# logger.exception("whoops ", e)
pass
def cleanup_unused_files(self):
p = pathlib.Path(self.data_path)
items = list(p.glob("*undetected*"))
print(items)
def patch(self): def patch(self):
self.patch_exe() self.patch_exe()
return self.is_binary_patched() return self.is_binary_patched()
@ -255,21 +332,17 @@ class Patcher(object):
else: else:
timeout = 3 # stop trying after this many seconds timeout = 3 # stop trying after this many seconds
t = time.monotonic() t = time.monotonic()
while True: now = lambda: time.monotonic()
now = time.monotonic() while now() - t > timeout:
if now - t > timeout: # we don't want to wait until the end of time
# we don't want to wait until the end of time
logger.debug(
"could not unlink %s in time (%d seconds)"
% (self.executable_path, timeout)
)
break
try: try:
if self.user_multi_procs:
break
os.unlink(self.executable_path) os.unlink(self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path) logger.debug("successfully unlinked %s" % self.executable_path)
break break
except (OSError, RuntimeError, PermissionError): except (OSError, RuntimeError, PermissionError):
time.sleep(0.1) time.sleep(0.01)
continue continue
except FileNotFoundError: except FileNotFoundError:
break break