This repository has been archived on 2023-11-11. You can view files and clone it, but cannot push or open issues or pull requests.
automated-youtube-dl/process/threads.py

239 lines
8.7 KiB
Python

import math
import multiprocessing
import os
import subprocess
import sys
import time
import traceback
from multiprocessing import Manager
from pathlib import Path
from threading import Thread
import numpy as np
import yt_dlp as ydl_ydl
from hurry.filesize import size
from tqdm.auto import tqdm
from unidecode import unidecode
import ydl.yt_dlp as ydl
from process.funcs import remove_special_chars_linux, setup_file_logger
class ytdl_logger(object):
errors = []
def __init__(self, logger=None):
self.logger = logger
# logging.basicConfig(level=logging.DEBUG)
# self.logger = logging
# self.logger.info('testlog')
def debug(self, msg):
if self.logger:
self.logger.info(msg)
def info(self, msg):
if self.logger:
self.logger.info(msg)
def warning(self, msg):
if self.logger:
self.logger.warning(msg)
def error(self, msg):
if self.logger:
self.logger.error(msg)
self.errors.append(msg)
def is_manager_lock_locked(lock) -> bool:
"""
Manager().Lock().aquire() takes blocking, not block.
"""
locked = lock.acquire(blocking=False)
if not locked:
return True
else:
lock.release()
return False
name_max = int(subprocess.check_output("getconf NAME_MAX /", shell=True).decode()) - 30
def download_video(args) -> dict:
# Sleep for a little bit to space out the rush of workers flooding the bar locks.
# time.sleep(random.randint(1, 20) / 1000)
def progress_hook(d):
# Variables can be None if the download hasn't started yet.
if d['status'] == 'downloading':
total = None
if d.get('downloaded_bytes'):
# We want total_bytes but it may not exist so total_bytes_estimate is good too
if d.get('total_bytes'):
total = d.get('total_bytes')
elif d.get('total_bytes_estimate'):
total = d.get('total_bytes_estimate')
if total:
downloaded_bytes = int(d['downloaded_bytes'])
if total > 0:
percent = (downloaded_bytes / total) * 100
bar.update(int(np.round(percent - bar.n))) # If the progress bar doesn't end at 100% then round to 1 decimal place
bar.set_postfix({
'speed': d['_speed_str'],
'size': f"{size(d.get('downloaded_bytes'))}/{size(total)}",
})
else:
bar.set_postfix({
'speed': d['_speed_str'],
'size': f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}",
})
video = args[0]
kwargs = args[1]
output_dict = {'downloaded_video_id': None, 'video_id': video['id'], 'video_critical_err_msg': [], 'video_critical_err_msg_short': [], 'status_msg': [], 'logger_msg': []} # empty object
if not kwargs['ignore_downloaded'] and not video['channel_id'] or not video['channel'] or not video['channel_url']:
if video['duration'] or isinstance(video['view_count'], int):
# Sometimes videos don't have channel_id, channel, or channel_url but are actually valid. Like shorts.
pass
else:
output_dict['video_critical_err_msg_short'].append('unavailable.')
return output_dict
# Clean of forign languages
video['title'] = unidecode(video['title'])
# Get a bar
locked = False
if len(kwargs['bars']):
while not locked: # We're going to wait until a bar is available for us to use.
for item in kwargs['bars']:
if not is_manager_lock_locked(item[1]):
locked = item[1].acquire(timeout=0.01) # get the lock ASAP and don't wait if we didn't get it.
offset = item[0]
bar_lock = item[1]
break
kwargs['ydl_opts']['progress_hooks'] = [progress_hook]
desc_with = int(np.round(os.get_terminal_size()[0] * (1 / 4)))
bar = tqdm(total=100, position=offset, desc=f"{video['id']} - {video['title']}".ljust(desc_with)[:desc_with], bar_format='{l_bar}{bar}| {elapsed}<{remaining}{postfix}', leave=False)
start_time = time.time()
try:
kwargs['ydl_opts']['logger'] = ytdl_logger() # dummy silent logger
yt_dlp = ydl.YDL(kwargs['ydl_opts'])
video_n = yt_dlp.get_info(video['url'])
if not video_n:
output_dict['video_critical_err_msg_short'].append('failed to get info. Unavailable?')
return output_dict
video_n['url'] = video['url']
video = video_n
del video_n
# We created a new dict
video['title'] = unidecode(video['title'])
video['uploader'] = unidecode(video['uploader']) # now this info is present since we fetched it
video_filename = remove_special_chars_linux(ydl.get_output_templ(video_id=video['id'], title=video['title'], uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False), special_chars=['/'])
# Make sure the video title isn't too long
while len(video_filename) >= name_max - 3: # -3 so that I can add ...
video['title'] = video['title'][:-1]
video_filename = remove_special_chars_linux(
ydl.get_output_templ(
video_id=video['id'],
title=video['title'] + '...',
uploader=video['uploader'],
uploader_id=video['uploader_id'],
include_ext=False
), special_chars=['/'])
base_path = str(Path(kwargs['output_dir'], video_filename))
kwargs['ydl_opts']['outtmpl'] = f"{base_path}.%(ext)s"
# try:
# base_path = os.path.splitext(Path(kwargs['output_dir'], yt_dlp.prepare_filename(video)))[0]
# except AttributeError:
# # Sometimes we won't be able to pull the video info so just use the video's ID.
# base_path = kwargs['output_dir'] / video['id']
ylogger = ytdl_logger(setup_file_logger(video['id'], base_path + '.log'))
kwargs['ydl_opts']['logger'] = ylogger
with ydl_ydl.YoutubeDL(kwargs['ydl_opts']) as y:
error_code = y.download(video['url'])
# yt_dlp = ydl.YDL(kwargs['ydl_opts']) # recreate the object with the correct logging path
# error_code = yt_dlp(video['url']) # Do the download
if not error_code:
elapsed = round(math.ceil(time.time() - start_time) / 60, 2)
output_dict['logger_msg'].append(f"'{video['title']}' - Downloaded in {elapsed} min.")
output_dict['downloaded_video_id'] = video['id']
else:
output_dict['video_critical_err_msg'] = output_dict['video_critical_err_msg'] + ylogger.errors
except Exception:
output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}")
if locked:
bar.update(100 - bar.n)
if locked:
bar.close()
bar_lock.release()
return output_dict
def bar_eraser(video_bars, eraser_exit):
manager = Manager()
queue = manager.dict()
queue_lock = manager.Lock()
def eraser():
nonlocal queue
try:
while not eraser_exit.value:
for i in queue.keys():
if eraser_exit.value:
return
i = int(i)
lock = video_bars[i][1].acquire(timeout=0.1)
bar_lock = video_bars[i][1]
if lock:
bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K')
bar.close()
with queue_lock:
del queue_dict[i]
queue = queue_dict
bar_lock.release()
except KeyboardInterrupt:
sys.exit(0)
except multiprocessing.managers.RemoteError:
sys.exit(0)
except SystemExit:
sys.exit(0)
try:
Thread(target=eraser).start()
while not eraser_exit.value:
for i, item in enumerate(video_bars):
if eraser_exit.value:
return
if is_manager_lock_locked(item[1]):
with queue_lock:
queue_dict = queue
queue_dict[i] = True
queue = queue_dict
except KeyboardInterrupt:
sys.exit(0)
except multiprocessing.managers.RemoteError:
sys.exit(0)
except SystemExit:
sys.exit(0)
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass