import math import multiprocessing import os import subprocess import sys import time import traceback from multiprocessing import Manager from pathlib import Path from threading import Thread import numpy as np from tqdm.auto import tqdm from unidecode import unidecode import ydl.yt_dlp as ydl from process.funcs import remove_special_chars_linux, setup_file_logger class ytdl_logger(object): errors = [] def __init__(self, logger=None): self.logger = logger # logging.basicConfig(level=logging.DEBUG) # self.logger = logging # self.logger.info('testlog') def debug(self, msg): if self.logger: self.logger.info(msg) def info(self, msg): if self.logger: self.logger.info(msg) def warning(self, msg): if self.logger: self.logger.warning(msg) def error(self, msg): if self.logger: self.logger.error(msg) self.errors.append(msg) def is_manager_lock_locked(lock) -> bool: """ Manager().Lock().aquire() takes blocking, not block. """ locked = lock.acquire(blocking=False) if not locked: return True else: lock.release() return False name_max = int(subprocess.check_output("getconf NAME_MAX /", shell=True).decode()) - 30 def download_video(args) -> dict: # Sleep for a little bit to space out the rush of workers flooding the bar locks. # time.sleep(random.randint(1, 20) / 1000) def progress_hook(d): # downloaded_bytes and total_bytes can be None if the download hasn't started yet. if d['status'] == 'downloading': if d.get('downloaded_bytes') and d.get('total_bytes'): downloaded_bytes = int(d['downloaded_bytes']) total_bytes = int(d['total_bytes']) if total_bytes > 0: percent = (downloaded_bytes / total_bytes) * 100 bar.update(int(np.round(percent - bar.n))) # If the progress bar doesn't end at 100% then round to 1 decimal place bar.set_postfix({ 'speed': d['_speed_str'], 'size': f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}", }) video = args[0] kwargs = args[1] # Clean the strings of forign languages video['title'] = unidecode(video['title']) video['uploader'] = unidecode(video['uploader']) output_dict = {'downloaded_video_id': None, 'video_id': video['id'], 'video_critical_err_msg': [], 'video_critical_err_msg_short': [], 'status_msg': [], 'logger_msg': []} # empty object if not kwargs['ignore_downloaded'] and not video['channel_id'] or not video['channel'] or not video['channel_url']: if video['duration'] or isinstance(video['view_count'], int): # Sometimes videos don't have channel_id, channel, or channel_url but are actually valid. Like shorts. pass else: output_dict['video_critical_err_msg_short'].append('unavailable.') return output_dict # Get a bar locked = False if len(kwargs['bars']): while not locked: # We're going to wait until a bar is available for us to use. for item in kwargs['bars']: if not is_manager_lock_locked(item[1]): locked = item[1].acquire(timeout=0.01) # get the lock ASAP and don't wait if we didn't get it. offset = item[0] bar_lock = item[1] break kwargs['ydl_opts']['progress_hooks'] = [progress_hook] desc_with = int(np.round(os.get_terminal_size()[0] * (1 / 4))) bar = tqdm(total=100, position=offset, desc=f"{video['id']} - {video['title']}".ljust(desc_with)[:desc_with], bar_format='{l_bar}{bar}| {elapsed}<{remaining}{postfix}', leave=False) start_time = time.time() try: kwargs['ydl_opts']['logger'] = ytdl_logger() # dummy silent logger yt_dlp = ydl.YDL(kwargs['ydl_opts']) video_n = yt_dlp.get_info(video['url']) if not video_n: output_dict['video_critical_err_msg_short'].append('failed to get info. Unavailable?') return output_dict video_n['url'] = video['url'] video = video_n del video_n # We created a new dict video['title'] = unidecode(video['title']) video['uploader'] = unidecode(video['uploader']) video_filename = remove_special_chars_linux(ydl.get_output_templ(video_id=video['id'], title=video['title'], uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False), special_chars=['/']) # Make sure the video title isn't too long while len(video_filename) >= name_max - 3: # -3 so that I can add ... video['title'] = video['title'][:-1] video_filename = remove_special_chars_linux( ydl.get_output_templ( video_id=video['id'], title=video['title'] + '...', uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False ), special_chars=['/']) base_path = str(Path(kwargs['output_dir'], video_filename)) kwargs['ydl_opts']['outtmpl'] = f"{base_path}.%(ext)s" # try: # base_path = os.path.splitext(Path(kwargs['output_dir'], yt_dlp.prepare_filename(video)))[0] # except AttributeError: # # Sometimes we won't be able to pull the video info so just use the video's ID. # base_path = kwargs['output_dir'] / video['id'] ylogger = ytdl_logger(setup_file_logger(video['id'], base_path + '.log')) kwargs['ydl_opts']['logger'] = ylogger yt_dlp = ydl.YDL(kwargs['ydl_opts']) # recreate the object with the correct logging path error_code = yt_dlp(video['url']) # Do the download if not error_code: elapsed = round(math.ceil(time.time() - start_time) / 60, 2) output_dict['logger_msg'].append(f"'{video['title']}' - Downloaded in {elapsed} min.") output_dict['downloaded_video_id'] = video['id'] else: output_dict['video_critical_err_msg'] = output_dict['video_critical_err_msg'] + ylogger.errors except Exception: output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}") if locked: bar.update(100 - bar.n) if locked: bar.close() bar_lock.release() return output_dict def bar_eraser(video_bars, eraser_exit): manager = Manager() queue = manager.dict() queue_lock = manager.Lock() def eraser(): nonlocal queue try: while not eraser_exit.value: for i in queue.keys(): if eraser_exit.value: return i = int(i) lock = video_bars[i][1].acquire(timeout=0.1) bar_lock = video_bars[i][1] if lock: bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K') bar.close() with queue_lock: del queue_dict[i] queue = queue_dict bar_lock.release() except KeyboardInterrupt: sys.exit(0) except multiprocessing.managers.RemoteError: sys.exit(0) except SystemExit: sys.exit(0) try: Thread(target=eraser).start() while not eraser_exit.value: for i, item in enumerate(video_bars): if eraser_exit.value: return if is_manager_lock_locked(item[1]): with queue_lock: queue_dict = queue queue_dict[i] = True queue = queue_dict except KeyboardInterrupt: sys.exit(0) except multiprocessing.managers.RemoteError: sys.exit(0) except SystemExit: sys.exit(0) class ServiceExit(Exception): """ Custom exception which is used to trigger the clean exit of all running threads and the main program. """ pass