This repository has been archived on 2023-11-11. You can view files and clone it, but cannot push or open issues or pull requests.
automated-youtube-dl/process/threads.py

280 lines
10 KiB
Python

import math
import os
import random
import subprocess
import time
import traceback
from pathlib import Path
import numpy as np
import yt_dlp as ydl_ydl
from hurry.filesize import size
from tqdm.auto import tqdm
from unidecode import unidecode
import ydl.yt_dlp as ydl
from process.funcs import remove_special_chars_linux, setup_file_logger
class ytdl_logger(object):
errors = []
def __init__(self, logger=None):
self.logger = logger
# logging.basicConfig(level=logging.DEBUG)
# self.logger = logging
# self.logger.info('testlog')
def debug(self, msg):
if self.logger:
self.logger.info(msg)
def info(self, msg):
if self.logger:
self.logger.info(msg)
def warning(self, msg):
if self.logger:
self.logger.warning(msg)
def error(self, msg):
if self.logger:
self.logger.error(msg)
self.errors.append(msg)
def is_manager_lock_locked(lock) -> bool:
"""
Manager().Lock().aquire() takes blocking, not block.
"""
locked = lock.acquire(blocking=False)
if not locked:
return True
else:
lock.release()
return False
name_max = int(subprocess.check_output("getconf NAME_MAX /", shell=True).decode()) - 30
def download_video(args) -> dict:
# Sleep for a little bit to space out the rush of workers flooding the bar locks.
# time.sleep(random.randint(1, 20) / 1000)
def progress_hook(d):
# Variables can be None if the download hasn't started yet.
if d['status'] == 'downloading':
total = None
if d.get('downloaded_bytes'):
# We want total_bytes but it may not exist so total_bytes_estimate is good too
if d.get('total_bytes'):
total = d.get('total_bytes')
elif d.get('total_bytes_estimate'):
total = d.get('total_bytes_estimate')
if total:
downloaded_bytes = int(d['downloaded_bytes'])
if total > 0:
percent = (downloaded_bytes / total) * 100
bar.update(int(np.round(percent - bar.n))) # If the progress bar doesn't end at 100% then round to 1 decimal place
bar.set_postfix({
'speed': d['_speed_str'],
'size': f"{size(d.get('downloaded_bytes'))}/{size(total)}",
})
else:
bar.set_postfix({
'speed': d['_speed_str'],
'size': f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}",
})
video = args[0]
kwargs = args[1]
output_dict = {'downloaded_video_id': None, 'video_id': video['id'], 'video_critical_err_msg': [], 'video_critical_err_msg_short': [], 'status_msg': [], 'logger_msg': []} # empty object
if not kwargs['ignore_downloaded'] and not video['channel_id'] or not video['channel'] or not video['channel_url']:
if video['duration'] or isinstance(video['view_count'], int):
# Sometimes videos don't have channel_id, channel, or channel_url but are actually valid. Like shorts.
pass
else:
output_dict['video_critical_err_msg_short'].append('unavailable.')
return output_dict
# Clean of forign languages
video['title'] = unidecode(video['title'])
if len(kwargs['bars']):
bar_enabled = True
got_lock = False
while not got_lock: # Get a bar
for item in kwargs['bars']:
if item[1].acquire(timeout=0.01):
got_lock = True
bar_offset = item[0]
bar_lock = item[1]
break
else:
time.sleep(random.uniform(0.1, 0.5))
kwargs['ydl_opts']['progress_hooks'] = [progress_hook]
desc_with = int(np.round(os.get_terminal_size()[0] * (1 / 4)))
bar = tqdm(total=100, position=bar_offset, desc=f"{video['id']} - {video['title']}".ljust(desc_with)[:desc_with], bar_format='{l_bar}{bar}| {elapsed}<{remaining}{postfix}', leave=False)
else:
bar_enabled = False
# got_lock = False
# # if len(kwargs['bars']):
# while not got_lock: # We're going to wait until a bar is available for us to use.
# for item in kwargs['bars']:
# # if not is_manager_lock_locked(item[1]):
# got_lock = item[1].acquire(timeout=0.01) # get the lock ASAP and don't wait if we didn't get it.
#
# if got_lock:
# print('GOT LOCK:', video['id'])
# # Now that we've gotten the lock, set some variables related to the bar
# offset = item[0]
# bar_lock = item[1]
# break
# else:
# print('WAITING FOR LOCK:', video['id'])
# time.sleep(uniform(0.1, 0.9))
start_time = time.time()
try:
kwargs['ydl_opts']['logger'] = ytdl_logger() # dummy silent logger
yt_dlp = ydl.YDL(kwargs['ydl_opts'])
video_n = yt_dlp.get_info(video['url'])
if not video_n:
output_dict['video_critical_err_msg_short'].append('failed to get info. Unavailable?')
if bar_enabled:
bar.close()
bar_lock.release()
return output_dict
video_n['url'] = video['url']
video = video_n
del video_n
# We created a new dict
video['title'] = unidecode(video['title'])
video['uploader'] = unidecode(video['uploader']) # now this info is present since we fetched it
# TODO: do we also need to remove the @ char?
video_filename = remove_special_chars_linux(
ydl.get_output_templ(video_id=video['id'], title=video['title'], uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False), special_chars=['/']
)
# Make sure the video title isn't too long
while len(video_filename) >= name_max - 3: # -3 so that I can add ...
video['title'] = video['title'][:-1]
video_filename = remove_special_chars_linux(
ydl.get_output_templ(
video_id=video['id'],
title=video['title'] + '...',
uploader=video['uploader'],
uploader_id=video['uploader_id'],
include_ext=False
), special_chars=['/'])
base_path = str(Path(kwargs['output_dir'], video_filename))
kwargs['ydl_opts']['outtmpl'] = f"{base_path}.%(ext)s"
# try:
# base_path = os.path.splitext(Path(kwargs['output_dir'], yt_dlp.prepare_filename(video)))[0]
# except AttributeError:
# # Sometimes we won't be able to pull the video info so just use the video's ID.
# base_path = kwargs['output_dir'] / video['id']
ylogger = ytdl_logger(setup_file_logger(video['id'], base_path + '.log'))
kwargs['ydl_opts']['logger'] = ylogger
with ydl_ydl.YoutubeDL(kwargs['ydl_opts']) as y:
error_code = y.download(video['url'])
# yt_dlp = ydl.YDL(kwargs['ydl_opts']) # recreate the object with the correct logging path
# error_code = yt_dlp(video['url']) # Do the download
if not error_code:
elapsed = round(math.ceil(time.time() - start_time) / 60, 2)
output_dict['logger_msg'].append(f"'{video['title']}' - Downloaded in {elapsed} min.")
output_dict['downloaded_video_id'] = video['id']
else:
output_dict['video_critical_err_msg'] = output_dict['video_critical_err_msg'] + ylogger.errors
except Exception:
output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}")
if bar_enabled:
bar.update(100 - bar.n)
if bar_enabled:
bar.close()
bar_lock.release()
return output_dict
def bar_eraser(video_bars, eraser_exit):
while not eraser_exit.value:
for i, item in enumerate(video_bars):
if eraser_exit.value:
return
i = int(i)
bar_lock = video_bars[i][1]
if video_bars[i][1].acquire(timeout=0.1):
bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K')
bar.close()
bar_lock.release()
# Old queue and queue processor threads
# manager = Manager()
# queue = manager.dict()
# queue_lock = manager.Lock()
# def eraser():
# nonlocal queue
# try:
# while not eraser_exit.value:
# for i in queue.keys():
# if eraser_exit.value:
# return
# i = int(i)
# lock = video_bars[i][1].acquire(timeout=0.1)
# bar_lock = video_bars[i][1]
# if lock:
# bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K')
# bar.close()
# with queue_lock:
# del queue_dict[i]
# queue = queue_dict
# bar_lock.release()
# except KeyboardInterrupt:
# sys.exit(0)
# except multiprocessing.managers.RemoteError:
# sys.exit(0)
# except SystemExit:
# sys.exit(0)
#
# try:
# Thread(target=eraser).start()
# while not eraser_exit.value:
# for i, item in enumerate(video_bars):
# if eraser_exit.value:
# return
# # Add bars to the queue
# if is_manager_lock_locked(item[1]):
# with queue_lock:
# queue_dict = queue
# queue_dict[i] = True
# queue = queue_dict
# except KeyboardInterrupt:
# sys.exit(0)
# except multiprocessing.managers.RemoteError:
# sys.exit(0)
# except SystemExit:
# sys.exit(0)
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass