From 4cf1e5d2f9a145de9749e90a1d244350c82ef610 Mon Sep 17 00:00:00 2001 From: shirt <2660574+shirt-dev@users.noreply.github.com> Date: Fri, 12 Mar 2021 23:46:58 -0500 Subject: [PATCH] Native concurrent downloading of fragments (#166) * Option `--concurrent-fragments` (`-N`) to set the number of threads Related: #165 Known issues: * When receiving Ctrl+C, the process will exit only after finishing the currently downloading fragments * The download progress shows the speed of only one thread Authored by shirt-dev --- .gitignore | 1 + README.md | 2 + yt_dlp/__init__.py | 3 + yt_dlp/downloader/dash.py | 152 +++++++++++++++++++-------- yt_dlp/downloader/external.py | 4 +- yt_dlp/downloader/hls.py | 188 +++++++++++++++++++++++----------- yt_dlp/options.py | 4 + 7 files changed, 250 insertions(+), 104 deletions(-) diff --git a/.gitignore b/.gitignore index af0259f84..a2484b752 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,7 @@ yt-dlp.zip *.mkv *.swf *.part +*.part-* *.ytdl *.dump *.frag diff --git a/README.md b/README.md index f9005118c..4b19725fc 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,8 @@ Then simply run `make`. You can also run `make yt-dlp` instead to compile only t --no-include-ads Do not download advertisements (default) ## Download Options: + -N, --concurrent-fragments N Number of fragments to download + concurrently (default is 1) -r, --limit-rate RATE Maximum download rate in bytes per second (e.g. 50K or 4.2M) -R, --retries RETRIES Number of retries (default is 10), or diff --git a/yt_dlp/__init__.py b/yt_dlp/__init__.py index b8b8495e6..55b962be1 100644 --- a/yt_dlp/__init__.py +++ b/yt_dlp/__init__.py @@ -180,6 +180,8 @@ def _real_main(argv=None): if opts.overwrites: # --yes-overwrites implies --no-continue opts.continue_dl = False + if opts.concurrent_fragment_downloads <= 0: + raise ValueError('Concurrent fragments must be positive') def parse_retries(retries, name=''): if retries in ('inf', 'infinite'): @@ -463,6 +465,7 @@ def _real_main(argv=None): 'extractor_retries': opts.extractor_retries, 'skip_unavailable_fragments': opts.skip_unavailable_fragments, 'keep_fragments': opts.keep_fragments, + 'concurrent_fragment_downloads': opts.concurrent_fragment_downloads, 'buffersize': opts.buffersize, 'noresizebuffer': opts.noresizebuffer, 'http_chunk_size': opts.http_chunk_size, diff --git a/yt_dlp/downloader/dash.py b/yt_dlp/downloader/dash.py index 6eae5bf0a..32e510d38 100644 --- a/yt_dlp/downloader/dash.py +++ b/yt_dlp/downloader/dash.py @@ -1,11 +1,18 @@ from __future__ import unicode_literals +try: + import concurrent.futures + can_threaded_download = True +except ImportError: + can_threaded_download = False + from ..downloader import _get_real_downloader from .fragment import FragmentFD from ..compat import compat_urllib_error from ..utils import ( DownloadError, + sanitize_open, urljoin, ) @@ -49,47 +56,11 @@ class DashSegmentsFD(FragmentFD): assert fragment_base_url fragment_url = urljoin(fragment_base_url, fragment['path']) - if real_downloader: - fragments_to_download.append({ - 'url': fragment_url, - }) - continue - - # In DASH, the first segment contains necessary headers to - # generate a valid MP4 file, so always abort for the first segment - fatal = i == 0 or not skip_unavailable_fragments - count = 0 - while count <= fragment_retries: - try: - success, frag_content = self._download_fragment(ctx, fragment_url, info_dict) - if not success: - return False - self._append_fragment(ctx, frag_content) - break - except compat_urllib_error.HTTPError as err: - # YouTube may often return 404 HTTP error for a fragment causing the - # whole download to fail. However if the same fragment is immediately - # retried with the same request data this usually succeeds (1-2 attempts - # is usually enough) thus allowing to download the whole file successfully. - # To be future-proof we will retry all fragments that fail with any - # HTTP error. - count += 1 - if count <= fragment_retries: - self.report_retry_fragment(err, frag_index, count, fragment_retries) - except DownloadError: - # Don't retry fragment if error occurred during HTTP downloading - # itself since it has own retry settings - if not fatal: - self.report_skip_fragment(frag_index) - break - raise - - if count > fragment_retries: - if not fatal: - self.report_skip_fragment(frag_index) - continue - self.report_error('giving up after %s fragment retries' % fragment_retries) - return False + fragments_to_download.append({ + 'frag_index': frag_index, + 'index': i, + 'url': fragment_url, + }) if real_downloader: info_copy = info_dict.copy() @@ -102,5 +73,104 @@ class DashSegmentsFD(FragmentFD): if not success: return False else: + def download_fragment(fragment): + i = fragment['index'] + frag_index = fragment['frag_index'] + fragment_url = fragment['url'] + + ctx['fragment_index'] = frag_index + + # In DASH, the first segment contains necessary headers to + # generate a valid MP4 file, so always abort for the first segment + fatal = i == 0 or not skip_unavailable_fragments + count = 0 + while count <= fragment_retries: + try: + success, frag_content = self._download_fragment(ctx, fragment_url, info_dict) + if not success: + return False, frag_index + break + except compat_urllib_error.HTTPError as err: + # YouTube may often return 404 HTTP error for a fragment causing the + # whole download to fail. However if the same fragment is immediately + # retried with the same request data this usually succeeds (1-2 attempts + # is usually enough) thus allowing to download the whole file successfully. + # To be future-proof we will retry all fragments that fail with any + # HTTP error. + count += 1 + if count <= fragment_retries: + self.report_retry_fragment(err, frag_index, count, fragment_retries) + except DownloadError: + # Don't retry fragment if error occurred during HTTP downloading + # itself since it has own retry settings + if not fatal: + break + raise + + if count > fragment_retries: + if not fatal: + return False, frag_index + self.report_error('giving up after %s fragment retries' % fragment_retries) + return False, frag_index + + return frag_content, frag_index + + def append_fragment(frag_content, frag_index): + if frag_content: + fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index) + try: + file, frag_sanitized = sanitize_open(fragment_filename, 'rb') + ctx['fragment_filename_sanitized'] = frag_sanitized + file.close() + self._append_fragment(ctx, frag_content) + return True + except FileNotFoundError: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + else: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + + max_workers = self.params.get('concurrent_fragment_downloads', 1) + if can_threaded_download and max_workers > 1: + self.report_warning('The download speed shown is only of one thread. This is a known issue') + with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + futures = [pool.submit(download_fragment, fragment) for fragment in fragments_to_download] + # timeout must be 0 to return instantly + done, not_done = concurrent.futures.wait(futures, timeout=0) + try: + while not_done: + # Check every 1 second for KeyboardInterrupt + freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1) + done |= freshly_done + except KeyboardInterrupt: + for future in not_done: + future.cancel() + # timeout must be none to cancel + concurrent.futures.wait(not_done, timeout=None) + raise KeyboardInterrupt + results = [future.result() for future in futures] + + for frag_content, frag_index in results: + result = append_fragment(frag_content, frag_index) + if not result: + return False + else: + for fragment in fragments_to_download: + frag_content, frag_index = download_fragment(fragment) + result = append_fragment(frag_content, frag_index) + if not result: + return False + self._finish_frag_download(ctx) return True diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py index c315deb2e..7c2d56d9c 100644 --- a/yt_dlp/downloader/external.py +++ b/yt_dlp/downloader/external.py @@ -126,7 +126,7 @@ class ExternalFD(FileDownloader): file_list = [] dest, _ = sanitize_open(tmpfilename, 'wb') for i, fragment in enumerate(info_dict['fragments']): - file = '%s_%s.frag' % (tmpfilename, i) + file = '%s-Frag%d' % (tmpfilename, i) decrypt_info = fragment.get('decrypt_info') src, _ = sanitize_open(file, 'rb') if decrypt_info: @@ -274,7 +274,7 @@ class Aria2cFD(ExternalFD): url_list_file = '%s.frag.urls' % tmpfilename url_list = [] for i, fragment in enumerate(info_dict['fragments']): - tmpsegmentname = '%s_%s.frag' % (os.path.basename(tmpfilename), i) + tmpsegmentname = '%s-Frag%d' % (os.path.basename(tmpfilename), i) url_list.append('%s\n\tout=%s' % (fragment['url'], tmpsegmentname)) stream, _ = sanitize_open(url_list_file, 'wb') stream.write('\n'.join(url_list).encode('utf-8')) diff --git a/yt_dlp/downloader/hls.py b/yt_dlp/downloader/hls.py index 77606b0ed..8b7d51de3 100644 --- a/yt_dlp/downloader/hls.py +++ b/yt_dlp/downloader/hls.py @@ -7,6 +7,11 @@ try: can_decrypt_frag = True except ImportError: can_decrypt_frag = False +try: + import concurrent.futures + can_threaded_download = True +except ImportError: + can_threaded_download = False from ..downloader import _get_real_downloader from .fragment import FragmentFD @@ -19,6 +24,7 @@ from ..compat import ( ) from ..utils import ( parse_m3u8_attributes, + sanitize_open, update_url_query, ) @@ -151,7 +157,6 @@ class HlsFD(FragmentFD): ad_frag_next = False for line in s.splitlines(): line = line.strip() - download_frag = False if line: if not line.startswith('#'): if format_index and discontinuity_count != format_index: @@ -168,13 +173,13 @@ class HlsFD(FragmentFD): if extra_query: frag_url = update_url_query(frag_url, extra_query) - if real_downloader: - fragments.append({ - 'url': frag_url, - 'decrypt_info': decrypt_info, - }) - continue - download_frag = True + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence, + }) elif line.startswith('#EXT-X-MAP'): if format_index and discontinuity_count != format_index: @@ -191,12 +196,14 @@ class HlsFD(FragmentFD): else compat_urlparse.urljoin(man_url, map_info.get('URI'))) if extra_query: frag_url = update_url_query(frag_url, extra_query) - if real_downloader: - fragments.append({ - 'url': frag_url, - 'decrypt_info': decrypt_info, - }) - continue + + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence + }) if map_info.get('BYTERANGE'): splitted_byte_range = map_info.get('BYTERANGE').split('@') @@ -205,7 +212,6 @@ class HlsFD(FragmentFD): 'start': sub_range_start, 'end': sub_range_start + int(splitted_byte_range[0]), } - download_frag = True elif line.startswith('#EXT-X-KEY'): decrypt_url = decrypt_info.get('URI') @@ -236,53 +242,12 @@ class HlsFD(FragmentFD): ad_frag_next = False elif line.startswith('#EXT-X-DISCONTINUITY'): discontinuity_count += 1 + i += 1 + media_sequence += 1 - if download_frag: - count = 0 - headers = info_dict.get('http_headers', {}) - if byte_range: - headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) - while count <= fragment_retries: - try: - success, frag_content = self._download_fragment( - ctx, frag_url, info_dict, headers) - if not success: - return False - break - except compat_urllib_error.HTTPError as err: - # Unavailable (possibly temporary) fragments may be served. - # First we try to retry then either skip or abort. - # See https://github.com/ytdl-org/youtube-dl/issues/10165, - # https://github.com/ytdl-org/youtube-dl/issues/10448). - count += 1 - if count <= fragment_retries: - self.report_retry_fragment(err, frag_index, count, fragment_retries) - if count > fragment_retries: - if skip_unavailable_fragments: - i += 1 - media_sequence += 1 - self.report_skip_fragment(frag_index) - continue - self.report_error( - 'giving up after %s fragment retries' % fragment_retries) - return False - - if decrypt_info['METHOD'] == 'AES-128': - iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) - decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen( - self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read() - # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block - # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, - # not what it decrypts to. - if not test: - frag_content = AES.new( - decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) - self._append_fragment(ctx, frag_content) - # We only download the first fragment during the test - if test: - break - i += 1 - media_sequence += 1 + # We only download the first fragment during the test + if test: + fragments = [fragments[0] if fragments else None] if real_downloader: info_copy = info_dict.copy() @@ -295,5 +260,106 @@ class HlsFD(FragmentFD): if not success: return False else: + def download_fragment(fragment): + frag_index = fragment['frag_index'] + frag_url = fragment['url'] + decrypt_info = fragment['decrypt_info'] + byte_range = fragment['byte_range'] + media_sequence = fragment['media_sequence'] + + ctx['fragment_index'] = frag_index + + count = 0 + headers = info_dict.get('http_headers', {}) + if byte_range: + headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) + while count <= fragment_retries: + try: + success, frag_content = self._download_fragment( + ctx, frag_url, info_dict, headers) + if not success: + return False, frag_index + break + except compat_urllib_error.HTTPError as err: + # Unavailable (possibly temporary) fragments may be served. + # First we try to retry then either skip or abort. + # See https://github.com/ytdl-org/youtube-dl/issues/10165, + # https://github.com/ytdl-org/youtube-dl/issues/10448). + count += 1 + if count <= fragment_retries: + self.report_retry_fragment(err, frag_index, count, fragment_retries) + if count > fragment_retries: + return False, frag_index + + if decrypt_info['METHOD'] == 'AES-128': + iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) + decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen( + self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read() + # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block + # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, + # not what it decrypts to. + if not test: + frag_content = AES.new( + decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) + + return frag_content, frag_index + + def append_fragment(frag_content, frag_index): + if frag_content: + fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index) + try: + file, frag_sanitized = sanitize_open(fragment_filename, 'rb') + ctx['fragment_filename_sanitized'] = frag_sanitized + file.close() + self._append_fragment(ctx, frag_content) + return True + except FileNotFoundError: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + else: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + + max_workers = self.params.get('concurrent_fragment_downloads', 1) + if can_threaded_download and max_workers > 1: + self.report_warning('The download speed shown is only of one thread. This is a known issue') + with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + futures = [pool.submit(download_fragment, fragment) for fragment in fragments] + # timeout must be 0 to return instantly + done, not_done = concurrent.futures.wait(futures, timeout=0) + try: + while not_done: + # Check every 1 second for KeyboardInterrupt + freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1) + done |= freshly_done + except KeyboardInterrupt: + for future in not_done: + future.cancel() + # timeout must be none to cancel + concurrent.futures.wait(not_done, timeout=None) + raise KeyboardInterrupt + results = [future.result() for future in futures] + + for frag_content, frag_index in results: + result = append_fragment(frag_content, frag_index) + if not result: + return False + else: + for fragment in fragments: + frag_content, frag_index = download_fragment(fragment) + result = append_fragment(frag_content, frag_index) + if not result: + return False + self._finish_frag_download(ctx) return True diff --git a/yt_dlp/options.py b/yt_dlp/options.py index adef0e0a8..1e995b490 100644 --- a/yt_dlp/options.py +++ b/yt_dlp/options.py @@ -558,6 +558,10 @@ def parseOpts(overrideArguments=None): help='Languages of the subtitles to download (optional) separated by commas, use --list-subs for available language tags') downloader = optparse.OptionGroup(parser, 'Download Options') + downloader.add_option( + '-N', '--concurrent-fragments', + dest='concurrent_fragment_downloads', metavar='N', default=1, type=int, + help='Number of fragments to download concurrently (default is %default)') downloader.add_option( '-r', '--limit-rate', '--rate-limit', dest='ratelimit', metavar='RATE',