From 65f6e807804d2af5e00f2aecd72bfc43af19324a Mon Sep 17 00:00:00 2001 From: pukkandan Date: Tue, 28 Feb 2023 23:10:54 +0530 Subject: [PATCH] [dependencies] Simplify `Cryptodome` Closes #6292, closes #6272, closes #6338 --- test/test_aes.py | 4 +-- yt_dlp/__pyinstaller/hook-yt_dlp.py | 28 +--------------- yt_dlp/aes.py | 6 ++-- yt_dlp/compat/_legacy.py | 2 +- yt_dlp/compat/compat_utils.py | 2 +- yt_dlp/dependencies/Cryptodome.py | 50 ++++++++++++++++++----------- yt_dlp/dependencies/__init__.py | 2 +- yt_dlp/downloader/hls.py | 2 +- yt_dlp/extractor/bilibili.py | 6 ++-- yt_dlp/extractor/ivi.py | 8 ++--- yt_dlp/extractor/wrestleuniverse.py | 6 ++-- 11 files changed, 52 insertions(+), 64 deletions(-) diff --git a/test/test_aes.py b/test/test_aes.py index 18f15fecb..a26abfd7d 100644 --- a/test/test_aes.py +++ b/test/test_aes.py @@ -48,7 +48,7 @@ class TestAES(unittest.TestCase): data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd' decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv)) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) - if Cryptodome: + if Cryptodome.AES: decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv)) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) @@ -78,7 +78,7 @@ class TestAES(unittest.TestCase): decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify( bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12])) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) - if Cryptodome: + if Cryptodome.AES: decrypted = aes_gcm_decrypt_and_verify_bytes( data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12])) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) diff --git a/yt_dlp/__pyinstaller/hook-yt_dlp.py b/yt_dlp/__pyinstaller/hook-yt_dlp.py index 057cfef2f..63dcdffe0 100644 --- a/yt_dlp/__pyinstaller/hook-yt_dlp.py +++ b/yt_dlp/__pyinstaller/hook-yt_dlp.py @@ -1,30 +1,8 @@ -import ast -import os import sys -from pathlib import Path from PyInstaller.utils.hooks import collect_submodules -def find_attribute_accesses(node, name, path=()): - if isinstance(node, ast.Attribute): - path = [*path, node.attr] - if isinstance(node.value, ast.Name) and node.value.id == name: - yield path[::-1] - for child in ast.iter_child_nodes(node): - yield from find_attribute_accesses(child, name, path) - - -def collect_used_submodules(name, level): - for dirpath, _, filenames in os.walk(Path(__file__).parent.parent): - for filename in filenames: - if not filename.endswith('.py'): - continue - with open(Path(dirpath) / filename, encoding='utf8') as f: - for submodule in find_attribute_accesses(ast.parse(f.read()), name): - yield '.'.join(submodule[:level]) - - def pycryptodome_module(): try: import Cryptodome # noqa: F401 @@ -41,12 +19,8 @@ def pycryptodome_module(): def get_hidden_imports(): yield 'yt_dlp.compat._legacy' + yield pycryptodome_module() yield from collect_submodules('websockets') - - crypto = pycryptodome_module() - for sm in set(collect_used_submodules('Cryptodome', 2)): - yield f'{crypto}.{sm}' - # These are auto-detected, but explicitly add them just in case yield from ('mutagen', 'brotli', 'certifi') diff --git a/yt_dlp/aes.py b/yt_dlp/aes.py index deff0a2b3..b3a383cd9 100644 --- a/yt_dlp/aes.py +++ b/yt_dlp/aes.py @@ -5,14 +5,14 @@ from .compat import compat_ord from .dependencies import Cryptodome from .utils import bytes_to_intlist, intlist_to_bytes -if Cryptodome: +if Cryptodome.AES: def aes_cbc_decrypt_bytes(data, key, iv): """ Decrypt bytes with AES-CBC using pycryptodome """ - return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_CBC, iv).decrypt(data) + return Cryptodome.AES.new(key, Cryptodome.AES.MODE_CBC, iv).decrypt(data) def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce): """ Decrypt bytes with AES-GCM using pycryptodome """ - return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag) + return Cryptodome.AES.new(key, Cryptodome.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag) else: def aes_cbc_decrypt_bytes(data, key, iv): diff --git a/yt_dlp/compat/_legacy.py b/yt_dlp/compat/_legacy.py index 84d749209..83bf869a8 100644 --- a/yt_dlp/compat/_legacy.py +++ b/yt_dlp/compat/_legacy.py @@ -32,9 +32,9 @@ from re import match as compat_Match # noqa: F401 from . import compat_expanduser, compat_HTMLParseError, compat_realpath from .compat_utils import passthrough_module -from ..dependencies import Cryptodome_AES as compat_pycrypto_AES # noqa: F401 from ..dependencies import brotli as compat_brotli # noqa: F401 from ..dependencies import websockets as compat_websockets # noqa: F401 +from ..dependencies.Cryptodome import AES as compat_pycrypto_AES # noqa: F401 passthrough_module(__name__, '...utils', ('WINDOWS_VT_MODE', 'windows_enable_vt_mode')) diff --git a/yt_dlp/compat/compat_utils.py b/yt_dlp/compat/compat_utils.py index 8956b3bf1..3ca46d270 100644 --- a/yt_dlp/compat/compat_utils.py +++ b/yt_dlp/compat/compat_utils.py @@ -48,7 +48,7 @@ def passthrough_module(parent, child, allowed_attributes=(..., ), *, callback=la """Passthrough parent module into a child module, creating the parent if necessary""" def __getattr__(attr): if _is_package(parent): - with contextlib.suppress(ImportError): + with contextlib.suppress(ModuleNotFoundError): return importlib.import_module(f'.{attr}', parent.__name__) ret = from_child(attr) diff --git a/yt_dlp/dependencies/Cryptodome.py b/yt_dlp/dependencies/Cryptodome.py index 2adc51374..a50bce4d4 100644 --- a/yt_dlp/dependencies/Cryptodome.py +++ b/yt_dlp/dependencies/Cryptodome.py @@ -1,8 +1,5 @@ import types -from ..compat import functools -from ..compat.compat_utils import passthrough_module - try: import Cryptodome as _parent except ImportError: @@ -12,19 +9,36 @@ except ImportError: _parent = types.ModuleType('no_Cryptodome') __bool__ = lambda: False -passthrough_module(__name__, _parent, (..., '__version__')) -del passthrough_module - - -@property -@functools.cache -def _yt_dlp__identifier(): - if _parent.__name__ == 'Crypto': +__version__ = '' +AES = PKCS1_v1_5 = Blowfish = PKCS1_OAEP = SHA1 = CMAC = RSA = None +try: + if _parent.__name__ == 'Cryptodome': + from Cryptodome import __version__ + from Cryptodome.Cipher import AES + from Cryptodome.Cipher import PKCS1_v1_5 + from Cryptodome.Cipher import Blowfish + from Cryptodome.Cipher import PKCS1_OAEP + from Cryptodome.Hash import SHA1 + from Cryptodome.Hash import CMAC + from Cryptodome.PublicKey import RSA + elif _parent.__name__ == 'Crypto': + from Crypto import __version__ from Crypto.Cipher import AES - try: - # In pycrypto, mode defaults to ECB. See: - # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode - AES.new(b'abcdefghijklmnop') - except TypeError: - return 'pycrypto' - return _parent.__name__ + from Crypto.Cipher import PKCS1_v1_5 + from Crypto.Cipher import Blowfish + from Crypto.Cipher import PKCS1_OAEP + from Crypto.Hash import SHA1 + from Crypto.Hash import CMAC + from Crypto.PublicKey import RSA +except ImportError: + __version__ = f'broken {__version__}'.strip() + + +_yt_dlp__identifier = _parent.__name__ +if AES and _yt_dlp__identifier == 'Crypto': + try: + # In pycrypto, mode defaults to ECB. See: + # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode + AES.new(b'abcdefghijklmnop') + except TypeError: + _yt_dlp__identifier = 'pycrypto' diff --git a/yt_dlp/dependencies/__init__.py b/yt_dlp/dependencies/__init__.py index c2214e6db..6e7d29c5c 100644 --- a/yt_dlp/dependencies/__init__.py +++ b/yt_dlp/dependencies/__init__.py @@ -73,7 +73,7 @@ available_dependencies = {k: v for k, v in all_dependencies.items() if v} # Deprecated -Cryptodome_AES = Cryptodome.Cipher.AES if Cryptodome else None +Cryptodome_AES = Cryptodome.AES __all__ = [ diff --git a/yt_dlp/downloader/hls.py b/yt_dlp/downloader/hls.py index 29d6f6241..f2868dc52 100644 --- a/yt_dlp/downloader/hls.py +++ b/yt_dlp/downloader/hls.py @@ -70,7 +70,7 @@ class HlsFD(FragmentFD): can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None if can_download: has_ffmpeg = FFmpegFD.available() - no_crypto = not Cryptodome and '#EXT-X-KEY:METHOD=AES-128' in s + no_crypto = not Cryptodome.AES and '#EXT-X-KEY:METHOD=AES-128' in s if no_crypto and has_ffmpeg: can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available' elif no_crypto: diff --git a/yt_dlp/extractor/bilibili.py b/yt_dlp/extractor/bilibili.py index f4180633a..2252840b3 100644 --- a/yt_dlp/extractor/bilibili.py +++ b/yt_dlp/extractor/bilibili.py @@ -894,15 +894,15 @@ class BiliIntlBaseIE(InfoExtractor): } def _perform_login(self, username, password): - if not Cryptodome: + if not Cryptodome.RSA: raise ExtractorError('pycryptodomex not found. Please install', expected=True) key_data = self._download_json( 'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None, note='Downloading login key', errnote='Unable to download login key')['data'] - public_key = Cryptodome.PublicKey.RSA.importKey(key_data['key']) - password_hash = Cryptodome.Cipher.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8')) + public_key = Cryptodome.RSA.importKey(key_data['key']) + password_hash = Cryptodome.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8')) login_post = self._download_json( 'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({ 'username': username, diff --git a/yt_dlp/extractor/ivi.py b/yt_dlp/extractor/ivi.py index 96220bea9..fa5ceec95 100644 --- a/yt_dlp/extractor/ivi.py +++ b/yt_dlp/extractor/ivi.py @@ -91,7 +91,7 @@ class IviIE(InfoExtractor): for site in (353, 183): content_data = (data % site).encode() if site == 353: - if not Cryptodome: + if not Cryptodome.CMAC: continue timestamp = (self._download_json( @@ -105,8 +105,8 @@ class IviIE(InfoExtractor): query = { 'ts': timestamp, - 'sign': Cryptodome.Hash.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data, - Cryptodome.Cipher.Blowfish).hexdigest(), + 'sign': Cryptodome.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data, + Cryptodome.Blowfish).hexdigest(), } else: query = {} @@ -126,7 +126,7 @@ class IviIE(InfoExtractor): extractor_msg = 'Video %s does not exist' elif site == 353: continue - elif not Cryptodome: + elif not Cryptodome.CMAC: raise ExtractorError('pycryptodomex not found. Please install', expected=True) elif message: extractor_msg += ': ' + message diff --git a/yt_dlp/extractor/wrestleuniverse.py b/yt_dlp/extractor/wrestleuniverse.py index 78e7c83ab..5c6dec2c4 100644 --- a/yt_dlp/extractor/wrestleuniverse.py +++ b/yt_dlp/extractor/wrestleuniverse.py @@ -50,10 +50,10 @@ class WrestleUniverseBaseIE(InfoExtractor): data=data, headers=headers, query=query, fatal=fatal) def _call_encrypted_api(self, video_id, param='', msg='API', data={}, query={}, fatal=True): - if not Cryptodome: + if not Cryptodome.RSA: raise ExtractorError('pycryptodomex not found. Please install', expected=True) - private_key = Cryptodome.PublicKey.RSA.generate(2048) - cipher = Cryptodome.Cipher.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.Hash.SHA1) + private_key = Cryptodome.RSA.generate(2048) + cipher = Cryptodome.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.SHA1) def decrypt(data): if not data: