From 58298937eb25ed49a53c15d4a1bee4c100ff2dd0 Mon Sep 17 00:00:00 2001 From: Cyberes Date: Sun, 16 Jul 2023 00:09:00 -0600 Subject: [PATCH] add backend web api --- .gitignore | 1 + config.yml | 60 +++ downloader.py | 589 +++++---------------------- process/threads.py | 279 ------------- requirements.txt | 17 +- {process => server}/__init__.py | 0 server/api/__init__.py | 17 + server/api/api/__init__.py | 1 + server/api/api/api.py | 9 + server/api/create_app.py | 27 ++ server/api/database.py | 3 + server/api/health/__init__.py | 1 + server/api/health/health.py | 20 + server/api/job_tracker.py | 122 ++++++ server/api/jobs/__init__.py | 1 + server/api/jobs/jobs.py | 139 +++++++ server/api/jobs/queue.py | 6 + server/api/list/__init__.py | 1 + server/api/list/lists.py | 52 +++ server/api/list/video_list.py | 13 + server/api/shared.py | 3 + server/background.py | 37 ++ server/health.py | 18 + server/helpers/__init__.py | 0 server/helpers/misc.py | 7 + server/helpers/regex.py | 14 + server/logging.py | 42 ++ server/mysql.py | 207 ++++++++++ server/opts.py | 21 + server/process/__init__.py | 0 {process => server/process}/funcs.py | 6 +- server/process/main.py | 131 ++++++ server/process/mysql.py | 11 + server/process/threads.py | 132 ++++++ server/process/ytlogging.py | 30 ++ server/sql/database.sql | 152 +++++++ ydl/yt_dlp.py | 14 +- 37 files changed, 1415 insertions(+), 768 deletions(-) create mode 100644 config.yml delete mode 100644 process/threads.py rename {process => server}/__init__.py (100%) create mode 100644 server/api/__init__.py create mode 100644 server/api/api/__init__.py create mode 100644 server/api/api/api.py create mode 100644 server/api/create_app.py create mode 100644 server/api/database.py create mode 100644 server/api/health/__init__.py create mode 100644 server/api/health/health.py create mode 100644 server/api/job_tracker.py create mode 100644 server/api/jobs/__init__.py create mode 100644 server/api/jobs/jobs.py create mode 100644 server/api/jobs/queue.py create mode 100644 server/api/list/__init__.py create mode 100644 server/api/list/lists.py create mode 100644 server/api/list/video_list.py create mode 100644 server/api/shared.py create mode 100644 server/background.py create mode 100644 server/health.py create mode 100644 server/helpers/__init__.py create mode 100644 server/helpers/misc.py create mode 100644 server/helpers/regex.py create mode 100644 server/logging.py create mode 100644 server/mysql.py create mode 100644 server/opts.py create mode 100644 server/process/__init__.py rename {process => server/process}/funcs.py (94%) create mode 100644 server/process/main.py create mode 100644 server/process/mysql.py create mode 100644 server/process/threads.py create mode 100644 server/process/ytlogging.py create mode 100644 server/sql/database.sql diff --git a/.gitignore b/.gitignore index 0c25724..4dd7a9a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .idea targets.* !targets.sample.* +output/ # ---> Python # Byte-compiled / optimized / DLL files diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..2b34156 --- /dev/null +++ b/config.yml @@ -0,0 +1,60 @@ +# Base output directory +base_output: /home/dpanzer/Downloads/output + +# Logs directory. Defaults to base_output/logs +#logs_directory: ./logs + +# Don't update yt-dlp at launch. +no_update: false + +# Max allowed size of a video in MB. +max_size: 1100 + +# Delete the yt-dlp cache on start. +rm_cache_startup: false + +# How many download processes to use. Default: number of CPU cores +#threads: 1 + +# Run in daemon mode. Disables progress bars sleeps for the amount of time specified in --sleep. +daemon: false + +# How many minutes to sleep when in daemon mode. +sleep: 60 + +# The path to the directory to track downloaded videos. Defaults to your appdata path. +# download_cache_directory: /path/to/cache + +# Don't print any error messages to the console. +silence_errors: false + +# Ignore videos that have been already downloaded and disable checks. Let youtube-dl handle everything. +ignore_downloaded: false + +# Erase the tracked video file. +erase_downloaded_tracker: false + +# How many seconds to sleep between items to prevent rate-limiting. Does not affect time between videos as you should be fine since it takes a few seconds to merge everything and clean up. +ratelimit_sleep: 5 + +# The datatype of the input file. If set to auto, the file will be scanned for a URL on the first line. If is a URL, the filetype will be set to txt. If it is a key: value pair then the filetype will be set to yaml. +#input_datatype: auto + +# Verbose logging +verbose: true + +# Run ffprobe on the downloaded files. +#verify: false + +mysql: + host: 172.0.2.106 + user: automated_ytdlp + password: password1234 + database: automated_ytdlp + +# ===================================================================================================================== + +# Fine-tuning + +# How long to keep the status of a job in memory +jobs_cleanup_time: 60 # minutes \ No newline at end of file diff --git a/downloader.py b/downloader.py index 4e911fd..a590ca2 100755 --- a/downloader.py +++ b/downloader.py @@ -1,508 +1,135 @@ #!/usr/bin/env python3 import argparse -import logging.config -import math +import logging import os -import re -import shutil import subprocess -import sys -import tempfile -import time -from multiprocessing import Manager, Pool, cpu_count from pathlib import Path -from threading import Thread - +from typing import Union import yaml from appdirs import user_data_dir -from tqdm.auto import tqdm -from process.funcs import get_silent_logger, remove_duplicates_from_playlist, restart_program, setup_file_logger -from process.threads import bar_eraser, download_video +import server.background +import server.helpers.regex +from server import opts +from server.api import shared +from server.api.job_tracker import JobTracker +from server.mysql import DatabaseConnection, check_if_database_exists, db_logger, get_console_logger, init_db, test_mysql_connection from ydl.files import create_directories, resolve_path -from ydl.yt_dlp import YDL, update_ytdlp -def signal_handler(sig, frame): - # TODO: https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/ - # raise ServiceExit - sys.exit(0) +def load_config(path: Union[str, Path]): + with open(path, 'r') as file: + return yaml.safe_load(file) -# signal.signal(signal.SIGTERM, signal_handler) -# signal.signal(signal.SIGINT, signal_handler) +cwd = os.path.dirname(os.path.realpath(__file__)) -url_regex = re.compile(r'^(?:http|ftp)s?://' # http:// or https:// - r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... - r'localhost|' # localhost... - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip - r'(?::\d+)?' # optional port - r'(?:/?|[/?]\S+)$', re.IGNORECASE) -ansi_escape_regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--config', default=Path(cwd, 'config.yml'), help='Path to the main config file. Default: ./config.yml') + parser.add_argument('--erase-db', action='store_true', help='Erase and reset the database') + parser.add_argument('--force-db-setup', action='store_true', help='Execute the setup sql regardless') + args = parser.parse_args() -parser = argparse.ArgumentParser() -parser.add_argument('file', help='URL to download or path of a file containing the URLs of the videos to download.') -parser.add_argument('--output', required=False, help='Output directory. Ignored paths specified in a YAML file.') -parser.add_argument('--no-update', '-n', action='store_true', help='Don\'t update yt-dlp at launch.') -parser.add_argument('--max-size', type=int, default=1100, help='Max allowed size of a video in MB.') -parser.add_argument('--rm-cache', '-r', action='store_true', help='Delete the yt-dlp cache on start.') -parser.add_argument('--threads', type=int, default=(cpu_count() - 1), - help=f'How many download processes to use. Default: number of CPU cores (for your machine: {cpu_count()}) - 1 = {cpu_count() - 1}') -parser.add_argument('--daemon', '-d', action='store_true', - help="Run in daemon mode. Disables progress bars sleeps for the amount of time specified in --sleep.") -parser.add_argument('--sleep', type=float, default=60, help='How many minutes to sleep when in daemon mode.') -parser.add_argument('--download-cache-file-directory', default=user_data_dir('automated-youtube-dl', 'cyberes'), - help='The path to the directory to track downloaded videos. Defaults to your appdata path.') -parser.add_argument('--silence-errors', '-s', action='store_true', - help="Don't print any error messages to the console.") -parser.add_argument('--ignore-downloaded', '-i', action='store_true', - help='Ignore videos that have been already downloaded and disable checks. Let youtube-dl handle everything.') -parser.add_argument('--erase-downloaded-tracker', '-e', action='store_true', help='Erase the tracked video file.') -parser.add_argument('--ratelimit-sleep', type=int, default=5, - help='How many seconds to sleep between items to prevent rate-limiting. Does not affect time between videos as you should be fine since it takes a few seconds to merge everything and clean up.') -parser.add_argument('--input-datatype', choices=['auto', 'txt', 'yaml'], default='auto', - help='The datatype of the input file. If set to auto, the file will be scanned for a URL on the first line.' - 'If is a URL, the filetype will be set to txt. If it is a key: value pair then the filetype will be set to yaml.') -parser.add_argument('--log-dir', default=None, help='Where to store the logs. Must be set when --output is not.') -parser.add_argument('--verbose', '-v', action='store_true') -parser.add_argument('--verify', '-z', action='store_true', help='Run ffprobe on the downloaded files.') -args = parser.parse_args() + temp_logger = get_console_logger(debug=opts.verbose) -if args.threads <= 0: - print("Can't have 0 threads!") - sys.exit(1) + config_path = resolve_path(Path(args.config)) + if not config_path.is_file(): + print('Config file does not exist:', config_path) + quit(1) + config = load_config(config_path) -if args.output: - args.output = resolve_path(args.output) -if args.log_dir: - args.log_dir = resolve_path(args.log_dir) -elif not args.output and not args.log_dir: - args.log_dir = resolve_path(Path(os.getcwd(), 'automated-youtube-dl_logs')) - # print('Must set --log-dir when --output is not.') - # sys.exit(1) -else: - args.log_dir = args.output / 'logs' + opts.base_output = resolve_path(Path(config['base_output'])) + if not opts.base_output.is_dir(): + print('Base output directory does not exist:', opts.base_output) + quit(1) -args.download_cache_file_directory = resolve_path(args.download_cache_file_directory) + if 'download_cache_directory' in config.keys(): + download_cache_directory = resolve_path(Path(config['download_cache_directory'])) + if not download_cache_directory.is_dir(): + print('Download cache directory does not exist:', download_cache_directory) + quit(1) + else: + download_cache_directory = user_data_dir('automated-youtube-dl', 'cyberes') -# TODO: use logging for this -if args.verbose: - print('Cache directory:', args.download_cache_file_directory) + if 'threads' in config.keys(): + opts.threads = config['threads'] + if opts.threads <= 0: + print("Can't have <= 0 threads!") + quit(1) -log_time = time.time() + if 'jobs_cleanup_time' in config.keys(): + opts.jobs_cleanup_time = config['jobs_cleanup_time'] + if opts.jobs_cleanup_time <= 0: + print("jobs_cleanup_time must be greater than 0!") + quit(1) + if 'logs_directory' in config.keys(): + logs_directory = resolve_path(Path(config['logs_directory'])) + if not logs_directory.is_dir(): + print('Logs directory does not exist:', logs_directory) + quit(1) + else: + logs_directory = opts.base_output / 'logs' + create_directories(logs_directory) # create the default path in base_output which should exist. -def load_input_file(): - """ - Get the URLs of the videos to download. Is the input a URL or file? - """ - url_list = {} - if not re.match(url_regex, str(args.file)) or args.input_datatype in ('txt', 'yaml'): - args.file = resolve_path(args.file) - if not args.file.exists(): - print('Input file does not exist:', args.file) - sys.exit(1) - input_file = [x.strip().strip('\n') for x in list(args.file.open())] - if args.input_datatype == 'yaml' or (re.match(r'^.*?:\w*', input_file[0]) and args.input_datatype == 'auto'): - with open(args.file, 'r') as file: - try: - url_list = yaml.safe_load(file) - except yaml.YAMLError as e: - print('Failed to load config file, error:', e) - sys.exit(1) - elif args.input_datatype == 'txt' or (re.match(url_regex, input_file[0]) and args.input_datatype == 'auto'): - if not args.output: - args.output = resolve_path(Path(os.getcwd(), 'automated-youtube-dl_output')) - # print('You must specify an output path with --output when the input datatype is a text file.') - # sys.exit(1) - url_list[str(args.output)] = input_file + if config['verbose']: + opts.verbose = True + + opts.mysql = { + 'host': config['mysql']['host'], + 'user': config['mysql']['user'], + 'password': config['mysql']['password'], + 'database': config['mysql']['database'] + } + + temp_logger.info('Connecting to database...') + mysql_success, mysql_error = test_mysql_connection() + if not mysql_success: + print('Failed to connect to MySQL database:', mysql_error) + quit(1) + temp_logger.info('Database connected!') + + if args.erase_db: + prompt = input('Really erase? y/n > ') + if prompt.lower() == 'y': + with DatabaseConnection() as conn: + cursor = conn.cursor() + cursor.execute("show tables;") + result = cursor.fetchall() + for table in result: + t = table[0] + cursor.execute(f'TRUNCATE TABLE {t}') + print(t) + quit() + + db_created = check_if_database_exists() + if not db_created: + temp_logger.info('Setting up database...') + init_db() + temp_logger.info('Database created!') + db_correct, missing_tables = check_if_database_exists(partial=True) + if not db_correct: + if not args.force_db_setup: + temp_logger.fatal(f'Your database is missing tables: {", ".join(missing_tables)}. Please compare your DB to the setup SQL script. Or, try the --force-db-setup arg (not recommended).') + quit(1) else: - print('Unknown file type:', args.input_datatype) - print(input_file) - sys.exit(1) - del input_file # release file object - # Verify each line in the file is a valid URL. - # Also resolve the paths - resolved_paths = {} - for directory, urls in url_list.items(): - for item in urls: - if not re.match(url_regex, str(item)): - print(f'Not a url:', item) - sys.exit(1) - resolved_paths[resolve_path(directory)] = urls - url_list = resolved_paths - else: - # They gave us just a URL - if not args.output: - # Set a default path - args.output = resolve_path(Path(os.getcwd(), 'automated-youtube-dl_output')) - # print('You must specify an output path with --output when the input is a URL.') - # sys.exit(1) - url_list[str(args.output)] = [args.file] - return url_list + temp_logger.info('--force-db-setup forced us to set up the database...') + init_db() + # The DB is all set up so now we can log to it + temp_logger.handlers.clear() + del temp_logger + logger = db_logger('MAIN', 'logs', console=True) + logger.setLevel(logging.INFO) -url_list = load_input_file() + if config['rm_cache_startup']: + subprocess.run('yt-dlp --rm-cache-dir', shell=True) + logger.info('Cleared yt-dlp cache') -# Create directories AFTER loading the file -create_directories(*url_list.keys(), args.download_cache_file_directory) + # TODO: print startup variables including the ones above - -def do_update(): - if not args.no_update: - print('Updating yt-dlp...') - updated = update_ytdlp() - if updated: - print('Restarting program...') - restart_program() - else: - print('Up to date.') - - -if args.rm_cache: - subprocess.run('yt-dlp --rm-cache-dir', shell=True) - -# TODO: compress old log files - -if args.daemon: - print('Running in daemon mode.') - -create_directories(args.log_dir) - -# TODO: log file rotation https://www.blog.pythonlibrary.org/2014/02/11/python-how-to-create-rotating-logs/ -# TODO: log to one file instead of one for each run -file_logger = setup_file_logger('youtube_dl', args.log_dir / f'{str(int(log_time))}.log', level=logging.INFO) -video_error_logger = setup_file_logger('video_errors', args.log_dir / f'{int(log_time)}-errors.log', level=logging.INFO) -logger = get_silent_logger('yt-dl', silent=not args.daemon) - - -def log_info_twice(msg): - logger.info(msg) - file_logger.info(ansi_escape_regex.sub('', msg)) - - -log_info_twice('Starting process.') -start_time = time.time() - -manager = Manager() - - -def load_existing_videos(): - # Find existing videos. - output = set() - if not download_archive_file.exists(): - download_archive_file.touch() - with open(download_archive_file, 'r') as file: - output.update(([line.rstrip() for line in file])) - - # Remove duplicate lines. - # Something may have gone wrong in the past so we want to make sure everything is cleaned up. - with open(download_archive_file) as file: - uniqlines = set(file.readlines()) - fd, path = tempfile.mkstemp() - with os.fdopen(fd, 'w') as tmp: - tmp.writelines(set(uniqlines)) - shutil.move(path, download_archive_file) - return output - - -status_bar = tqdm(position=2, bar_format='{desc}', disable=args.daemon, leave=False) - - -def log_bar(log_msg, level): - status_bar.write(f'[{level}] {log_msg}') - if level == 'warning': - logger.warning(log_msg) - elif level == 'error': - logger.error(log_msg) - else: - logger.info(log_msg) - - -# def log_with_video_id(log_msg, video_id, level, logger_obj): -# log_msg = f'{video_id} - {log_msg}' -# if level == 'warning': -# logger_obj.warning(log_msg) -# elif level == 'error': -# logger_obj.error(log_msg) -# else: -# logger_obj.info(log_msg) - - -def print_without_paths(msg): - """ - Remove any filepaths or other stuff we don't want in the message. - """ - m = re.match(r'(^[^\/]+(?:\\.[^\/]*)*)', msg) - if m: - msg = m.group(1) - m1 = re.match(r'^(.*?): ', msg) - msg = msg.strip('to "').strip('to: ').strip() - if args.daemon: - log_info_twice(msg) - else: - status_bar.set_description_str(msg) - - -class ytdl_logger(object): - def debug(self, msg): - file_logger.debug(self.__clean_msg(msg)) - # if msg.startswith('[debug] '): - # pass - if '[download]' not in msg: - print_without_paths(msg) - - def info(self, msg): - file_logger.info(self.__clean_msg(msg)) - print_without_paths(msg) - - def warning(self, msg): - file_logger.warning(self.__clean_msg(msg)) - if args.daemon: - logger.warning(msg) - else: - status_bar.write(msg) - - def error(self, msg): - file_logger.error(self.__clean_msg(msg)) - if args.daemon: - logger.error(msg) - else: - status_bar.write(msg) - - def __clean_msg(self, msg): - return ansi_escape_regex.sub('', msg) - - -# TODO: https://github.com/TheFrenchGhosty/TheFrenchGhostys-Ultimate-YouTube-DL-Scripts-Collection/blob/master/docs/Scripts-Type.md#archivist-scripts - -# https://github.com/yt-dlp/yt-dlp#embedding-examples -ydl_opts = { - # TODO: https://github.com/TheFrenchGhosty/TheFrenchGhostys-Ultimate-YouTube-DL-Scripts-Collection/blob/master/docs/Details.md - # https://old.reddit.com/r/DataHoarder/comments/c6fh4x/after_hoarding_over_50k_youtube_videos_here_is/ - 'format': f'(bestvideo[filesize<{args.max_size}M][vcodec^=av01][height>=1080][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec=vp9.2][height>=1080][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec=vp9][height>=1080][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec^=av01][height>=1080]/bestvideo[filesize<{args.max_size}M][vcodec=vp9.2][height>=1080]/bestvideo[filesize<{args.max_size}M][vcodec=vp9][height>=1080]/bestvideo[filesize<{args.max_size}M][height>=1080]/bestvideo[filesize<{args.max_size}M][vcodec^=av01][height>=720][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec=vp9.2][height>=720][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec=vp9][height>=720][fps>30]/bestvideo[filesize<{args.max_size}M][vcodec^=av01][height>=720]/bestvideo[filesize<{args.max_size}M][vcodec=vp9.2][height>=720]/bestvideo[filesize<{args.max_size}M][vcodec=vp9][height>=720]/bestvideo[filesize<{args.max_size}M][height>=720]/bestvideo[filesize<{args.max_size}M])+(bestaudio[acodec=opus]/bestaudio)/best', - 'merge_output_format': 'mkv', - 'logtostderr': True, - 'embedchapters': True, - 'writethumbnail': True, - # Save the thumbnail to a file. Embedding seems to be broken right now so this is an alternative. - 'embedthumbnail': True, - 'embeddescription': True, - 'writesubtitles': True, - # 'allsubtitles': True, # Download every language. - 'subtitlesformat': 'vtt', - 'subtitleslangs': ['en'], - 'writeautomaticsub': True, - 'writedescription': True, - 'ignoreerrors': True, - 'continuedl': False, - 'addmetadata': True, - 'writeinfojson': True, - 'verbose': args.verbose, - 'postprocessors': [ - {'key': 'FFmpegEmbedSubtitle'}, - {'key': 'FFmpegMetadata', 'add_metadata': True}, - {'key': 'EmbedThumbnail', 'already_have_thumbnail': True}, - {'key': 'FFmpegThumbnailsConvertor', 'format': 'jpg', 'when': 'before_dl'}, - # {'key': 'FFmpegSubtitlesConvertor', 'format': 'srt'} - ], - # 'external_downloader': 'aria2c', - # 'external_downloader_args': ['-j 32', '-s 32', '-x 16', '--file-allocation=none', '--optimize-concurrent-downloads=true', '--http-accept-gzip=true', '--continue=true'], -} - -yt_dlp = YDL(dict(ydl_opts, **{'logger': ytdl_logger()})) - -url_count = 0 -for k, v in url_list.items(): - for item in v: - url_count += 1 - -# Init bars -video_bars = manager.list() -if not args.daemon: - for i in range(args.threads): - video_bars.append([3 + i, manager.Lock()]) - -encountered_errors = 0 -errored_videos = 0 - -# The video progress bars have an issue where when a bar is closed it -# will shift its position back 1 then return to the correct position. -# This thread will clear empty spots. -if not args.daemon: - eraser_exit = manager.Value(bool, False) - Thread(target=bar_eraser, args=(video_bars, eraser_exit,)).start() - -already_erased_downloaded_tracker = False - -while True: - # do_update() # this doesn't work very well. freezes - progress_bar = tqdm(total=url_count, position=0, desc='Inputs', disable=args.daemon, - bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}') - for output_path, urls in url_list.items(): - for target_url in urls: - logger.info('Fetching playlist...') - playlist = yt_dlp.playlist_contents(str(target_url)) - - if not playlist: - progress_bar.update() - continue - - url_list = load_input_file() - - download_archive_file = args.download_cache_file_directory / (str(playlist['id']) + '.log') - if args.erase_downloaded_tracker and not already_erased_downloaded_tracker: - if download_archive_file.exists(): - os.remove(download_archive_file) - already_erased_downloaded_tracker = True - downloaded_videos = load_existing_videos() - - msg = f'Found {len(downloaded_videos)} downloaded videos for playlist "{playlist["title"]}" ({playlist["id"]}). {"Ignoring." if args.ignore_downloaded else ""}' - if args.daemon: - logger.info(msg) - else: - progress_bar.write(msg) - download_archive_logger = setup_file_logger('download_archive', download_archive_file, - format_str='%(message)s') - - playlist['entries'] = remove_duplicates_from_playlist(playlist['entries']) - - log_info_twice(f'Downloading item: "{playlist["title"]}" ({playlist["id"]}) {target_url}') - - # Remove already downloaded files from the to-do list. - download_queue = [] - for p, video in enumerate(playlist['entries']): - if video['id'] not in download_queue: - if not args.ignore_downloaded and video['id'] not in downloaded_videos: - download_queue.append(video) - # downloaded_videos.add(video['id']) - elif args.ignore_downloaded: - download_queue.append(video) - - playlist_bar = tqdm(total=len(playlist['entries']), position=1, - desc=f'"{playlist["title"]}" ({playlist["id"]})', disable=args.daemon, leave=False) - if not args.ignore_downloaded: - playlist_bar.update(len(downloaded_videos)) - - playlist_ydl_opts = ydl_opts.copy() - # playlist_ydl_opts['outtmpl'] = f'{output_path}/{get_output_templ()}' - - if len(download_queue): # Don't mess with multiprocessing if all videos are already downloaded - with Pool(processes=args.threads) as pool: - if sys.stdout.isatty(): - # Doesn't work if not connected to a terminal: - # OSError: [Errno 25] Inappropriate ioctl for device - status_bar.set_description_str('=' * os.get_terminal_size()[0]) - logger.info('Starting downloads...') - for result in pool.imap_unordered(download_video, - ((video, { - 'bars': video_bars, - 'ydl_opts': playlist_ydl_opts, - 'output_dir': Path(output_path), - 'ignore_downloaded': args.ignore_downloaded, - 'verify': args.verify - }) for video in download_queue)): - # Save the video ID to the file - if result['downloaded_video_id']: - download_archive_logger.info(result['downloaded_video_id']) - - # Print short error messages. - # An error should never be added to both video_critical_err_msg_short and video_critical_err_msg. - for line in result['video_critical_err_msg_short']: - # file_msg = f"{result['video_id']} - {ansi_escape_regex.sub('', line)}" - # term_msg = f"{result['video_id']} - {line}" - msg = f"{result['video_id']} - {line}" - video_error_logger.error(msg) - file_logger.error(msg) - encountered_errors += 1 - if args.daemon: - logger.error(msg) - else: - status_bar.write(msg) - - # Print longer error messages. - # Won't print anything to console if the silence_errors arg is set. - for line in result['video_critical_err_msg']: - # file_msg = f"{result['video_id']} - {ansi_escape_regex.sub('', line)}" - # term_msg = f"{result['video_id']} - {line}" - msg = f"{result['video_id']} - {line}" - video_error_logger.error(msg) - file_logger.error(msg) - encountered_errors += 1 - if not args.silence_errors: - if args.daemon: - logger.error(msg) - else: - status_bar.write(msg) - - # if len(result['video_critical_err_msg']): - # errored_videos += 1 - # if args.silence_errors and args.daemon: - # logger.error(f"{result['video_id']} - Failed due to error.") - - for line in result['logger_msg']: - log_info_twice(f"{result['video_id']} - {line}") - - # TODO: if no error launch a verify multiprocess - # if kwargs['verify']: - # try: - # info = yt_dlp.extract_info(video['url']) - # except Exception as e: - # output_dict['video_critical_err_msg'].append(f'Failed to verify video, extract_info failed: {e}') - # file_path = base_path + info['ext'] - # result = ffprobe(file_path) - # if not result[0]: - # output_dict['video_critical_err_msg'].append(f'Failed to verify video: {result[4]}') - - playlist_bar.update() - else: - msg = f"All videos already downloaded for \"{playlist['title']}\"." - if args.daemon: - logger.info(msg) - else: - status_bar.write(msg) - log_info_twice(f"Finished item: '{playlist['title']}' {target_url}") - - # Sleep a bit to prevent rate-limiting - if progress_bar.n < len(url_list.keys()) - 1: - status_bar.set_description_str(f'Sleeping {args.ratelimit_sleep}s...') - time.sleep(args.ratelimit_sleep) - - progress_bar.update() - error_msg = f'Encountered {encountered_errors} errors on {errored_videos} videos.' - if args.daemon: - logger.info(error_msg) - else: - status_bar.write(error_msg) - log_info_twice(f"Finished process in {round(math.ceil(time.time() - start_time) / 60, 2)} min.") - if not args.daemon: - break - else: - logger.info(f'Sleeping for {args.sleep} min.') - try: - time.sleep(args.sleep * 60) - except KeyboardInterrupt: - sys.exit(0) - # downloaded_videos = load_existing_videos() # reload the videos that have already been downloaded - -# Clean up the remaining bars. Have to close them in order. -# These variables may be undefined so we will just ignore any errors. -# Not in one try/catch because we don't want to skip anything. -try: - eraser_exit.value = True -except NameError: - pass -except AttributeError: - pass -try: - playlist_bar.close() -except NameError: - pass -except AttributeError: - pass -try: - status_bar.close() -except NameError: - pass -except AttributeError: - pass + shared.global_job_tracker = JobTracker() + server.background.start() + server.api.start() + logger.info('Startup completed') diff --git a/process/threads.py b/process/threads.py deleted file mode 100644 index b56ac25..0000000 --- a/process/threads.py +++ /dev/null @@ -1,279 +0,0 @@ -import math -import os -import random -import subprocess -import time -import traceback -from pathlib import Path - -import numpy as np -import yt_dlp as ydl_ydl -from hurry.filesize import size -from tqdm.auto import tqdm -from unidecode import unidecode - -import ydl.yt_dlp as ydl -from process.funcs import remove_special_chars_linux, setup_file_logger - - -class ytdl_logger(object): - errors = [] - - def __init__(self, logger=None): - self.logger = logger - # logging.basicConfig(level=logging.DEBUG) - # self.logger = logging - # self.logger.info('testlog') - - def debug(self, msg): - if self.logger: - self.logger.info(msg) - - def info(self, msg): - if self.logger: - self.logger.info(msg) - - def warning(self, msg): - if self.logger: - self.logger.warning(msg) - - def error(self, msg): - if self.logger: - self.logger.error(msg) - self.errors.append(msg) - - -def is_manager_lock_locked(lock) -> bool: - """ - Manager().Lock().aquire() takes blocking, not block. - """ - locked = lock.acquire(blocking=False) - if not locked: - return True - else: - lock.release() - return False - - -name_max = int(subprocess.check_output("getconf NAME_MAX /", shell=True).decode()) - 30 - - -def download_video(args) -> dict: - # Sleep for a little bit to space out the rush of workers flooding the bar locks. - # time.sleep(random.randint(1, 20) / 1000) - - def progress_hook(d): - # Variables can be None if the download hasn't started yet. - if d['status'] == 'downloading': - total = None - if d.get('downloaded_bytes'): - # We want total_bytes but it may not exist so total_bytes_estimate is good too - if d.get('total_bytes'): - total = d.get('total_bytes') - elif d.get('total_bytes_estimate'): - total = d.get('total_bytes_estimate') - - if total: - downloaded_bytes = int(d['downloaded_bytes']) - if total > 0: - percent = (downloaded_bytes / total) * 100 - bar.update(int(np.round(percent - bar.n))) # If the progress bar doesn't end at 100% then round to 1 decimal place - bar.set_postfix({ - 'speed': d['_speed_str'], - 'size': f"{size(d.get('downloaded_bytes'))}/{size(total)}", - }) - else: - bar.set_postfix({ - 'speed': d['_speed_str'], - 'size': f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}", - }) - - video = args[0] - kwargs = args[1] - - output_dict = {'downloaded_video_id': None, 'video_id': video['id'], 'video_critical_err_msg': [], 'video_critical_err_msg_short': [], 'status_msg': [], 'logger_msg': []} # empty object - - if not kwargs['ignore_downloaded'] and not video['channel_id'] or not video['channel'] or not video['channel_url']: - if video['duration'] or isinstance(video['view_count'], int): - # Sometimes videos don't have channel_id, channel, or channel_url but are actually valid. Like shorts. - pass - else: - output_dict['video_critical_err_msg_short'].append('unavailable.') - return output_dict - - # Clean of forign languages - video['title'] = unidecode(video['title']) - - if len(kwargs['bars']): - bar_enabled = True - got_lock = False - while not got_lock: # Get a bar - for item in kwargs['bars']: - if item[1].acquire(timeout=0.01): - got_lock = True - bar_offset = item[0] - bar_lock = item[1] - break - else: - time.sleep(random.uniform(0.1, 0.5)) - kwargs['ydl_opts']['progress_hooks'] = [progress_hook] - desc_with = int(np.round(os.get_terminal_size()[0] * (1 / 4))) - bar = tqdm(total=100, position=bar_offset, desc=f"{video['id']} - {video['title']}".ljust(desc_with)[:desc_with], bar_format='{l_bar}{bar}| {elapsed}<{remaining}{postfix}', leave=False) - else: - bar_enabled = False - - # got_lock = False - # # if len(kwargs['bars']): - # while not got_lock: # We're going to wait until a bar is available for us to use. - # for item in kwargs['bars']: - # # if not is_manager_lock_locked(item[1]): - # got_lock = item[1].acquire(timeout=0.01) # get the lock ASAP and don't wait if we didn't get it. - # - # if got_lock: - # print('GOT LOCK:', video['id']) - # # Now that we've gotten the lock, set some variables related to the bar - # offset = item[0] - # bar_lock = item[1] - # break - # else: - # print('WAITING FOR LOCK:', video['id']) - # time.sleep(uniform(0.1, 0.9)) - - start_time = time.time() - - try: - kwargs['ydl_opts']['logger'] = ytdl_logger() # dummy silent logger - yt_dlp = ydl.YDL(kwargs['ydl_opts']) - video_n = yt_dlp.get_info(video['url']) - - if not video_n: - output_dict['video_critical_err_msg_short'].append('failed to get info. Unavailable?') - if bar_enabled: - bar.close() - bar_lock.release() - return output_dict - - video_n['url'] = video['url'] - video = video_n - del video_n - - # We created a new dict - video['title'] = unidecode(video['title']) - video['uploader'] = unidecode(video['uploader']) # now this info is present since we fetched it - - # TODO: do we also need to remove the @ char? - video_filename = remove_special_chars_linux( - ydl.get_output_templ(video_id=video['id'], title=video['title'], uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False), special_chars=['/'] - ) - - # Make sure the video title isn't too long - while len(video_filename) >= name_max - 3: # -3 so that I can add ... - video['title'] = video['title'][:-1] - video_filename = remove_special_chars_linux( - ydl.get_output_templ( - video_id=video['id'], - title=video['title'] + '...', - uploader=video['uploader'], - uploader_id=video['uploader_id'], - include_ext=False - ), special_chars=['/']) - - base_path = str(Path(kwargs['output_dir'], video_filename)) - - kwargs['ydl_opts']['outtmpl'] = f"{base_path}.%(ext)s" - - # try: - # base_path = os.path.splitext(Path(kwargs['output_dir'], yt_dlp.prepare_filename(video)))[0] - # except AttributeError: - # # Sometimes we won't be able to pull the video info so just use the video's ID. - # base_path = kwargs['output_dir'] / video['id'] - ylogger = ytdl_logger(setup_file_logger(video['id'], base_path + '.log')) - kwargs['ydl_opts']['logger'] = ylogger - with ydl_ydl.YoutubeDL(kwargs['ydl_opts']) as y: - error_code = y.download(video['url']) - # yt_dlp = ydl.YDL(kwargs['ydl_opts']) # recreate the object with the correct logging path - # error_code = yt_dlp(video['url']) # Do the download - - if not error_code: - elapsed = round(math.ceil(time.time() - start_time) / 60, 2) - output_dict['logger_msg'].append(f"'{video['title']}' - Downloaded in {elapsed} min.") - output_dict['downloaded_video_id'] = video['id'] - else: - output_dict['video_critical_err_msg'] = output_dict['video_critical_err_msg'] + ylogger.errors - except Exception: - output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}") - if bar_enabled: - bar.update(100 - bar.n) - - if bar_enabled: - bar.close() - bar_lock.release() - return output_dict - - -def bar_eraser(video_bars, eraser_exit): - while not eraser_exit.value: - for i, item in enumerate(video_bars): - if eraser_exit.value: - return - i = int(i) - bar_lock = video_bars[i][1] - if video_bars[i][1].acquire(timeout=0.1): - bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K') - bar.close() - bar_lock.release() - - # Old queue and queue processor threads - # manager = Manager() - # queue = manager.dict() - # queue_lock = manager.Lock() - # def eraser(): - # nonlocal queue - # try: - # while not eraser_exit.value: - # for i in queue.keys(): - # if eraser_exit.value: - # return - # i = int(i) - # lock = video_bars[i][1].acquire(timeout=0.1) - # bar_lock = video_bars[i][1] - # if lock: - # bar = tqdm(position=video_bars[i][0], leave=False, bar_format='\x1b[2K') - # bar.close() - # with queue_lock: - # del queue_dict[i] - # queue = queue_dict - # bar_lock.release() - # except KeyboardInterrupt: - # sys.exit(0) - # except multiprocessing.managers.RemoteError: - # sys.exit(0) - # except SystemExit: - # sys.exit(0) - # - # try: - # Thread(target=eraser).start() - # while not eraser_exit.value: - # for i, item in enumerate(video_bars): - # if eraser_exit.value: - # return - # # Add bars to the queue - # if is_manager_lock_locked(item[1]): - # with queue_lock: - # queue_dict = queue - # queue_dict[i] = True - # queue = queue_dict - # except KeyboardInterrupt: - # sys.exit(0) - # except multiprocessing.managers.RemoteError: - # sys.exit(0) - # except SystemExit: - # sys.exit(0) - - -class ServiceExit(Exception): - """ - Custom exception which is used to trigger the clean exit - of all running threads and the main program. - """ - pass diff --git a/requirements.txt b/requirements.txt index af3c228..5cb26ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,16 @@ yt-dlp -psutil +psutil~=5.9.5 tqdm -mergedeep +mergedeep~=1.3.4 numpy -pyyaml -appdirs +pyyaml~=6.0 +appdirs~=1.4.4 phantomjs -unidecode +unidecode~=1.3.6 ffmpeg-python -hurry.filesize \ No newline at end of file +hurry.filesize +mysql-connector-python~=8.0.33 +Flask-SQLAlchemy +PyMySQL +Flask~=2.3.2 +SQLAlchemy~=2.0.18 \ No newline at end of file diff --git a/process/__init__.py b/server/__init__.py similarity index 100% rename from process/__init__.py rename to server/__init__.py diff --git a/server/api/__init__.py b/server/api/__init__.py new file mode 100644 index 0000000..c1f2e0e --- /dev/null +++ b/server/api/__init__.py @@ -0,0 +1,17 @@ +from flask import Flask + +from server import opts +from server.api.job_tracker import JobTracker +from . import shared +from .create_app import create_app +from .database import db + + +def start(): + app = Flask(__name__) + app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{opts.mysql['user']}:{opts.mysql['password']}@{opts.mysql['host']}/{opts.mysql['database']}" + app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + db.init_app(app) + + app = create_app(shared.global_job_tracker) + app.run(host='0.0.0.0', port=8081) diff --git a/server/api/api/__init__.py b/server/api/api/__init__.py new file mode 100644 index 0000000..9536221 --- /dev/null +++ b/server/api/api/__init__.py @@ -0,0 +1 @@ +from .api import api_route diff --git a/server/api/api/api.py b/server/api/api/api.py new file mode 100644 index 0000000..35801a2 --- /dev/null +++ b/server/api/api/api.py @@ -0,0 +1,9 @@ +from flask import Blueprint, jsonify + +api_route = Blueprint('api', __name__) + + +@api_route.route('/') +@api_route.route('/') +def add_list(): + return jsonify({'message': 'automated-youtube-dl'}), 404 diff --git a/server/api/create_app.py b/server/api/create_app.py new file mode 100644 index 0000000..62abe2b --- /dev/null +++ b/server/api/create_app.py @@ -0,0 +1,27 @@ +from flask import Flask, current_app + +from .api import api_route +from .database import db +from .health import health_route +from .job_tracker import JobTracker +from .jobs import job_route +from .list import list_route +from .. import opts +from . import shared + + +def create_app(j_t: JobTracker): + shared.global_job_tracker = j_t + app = Flask(__name__) + app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{opts.mysql['user']}:{opts.mysql['password']}@{opts.mysql['host']}/{opts.mysql['database']}" + app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + + app.register_blueprint(api_route, url_prefix='/api') + app.register_blueprint(job_route, url_prefix='/api/job') + app.register_blueprint(list_route, url_prefix='/api/list') + app.register_blueprint(health_route, url_prefix='/api/health') + + db.init_app(app) + # with app.app_context(): + # print(current_app.url_map) + return app diff --git a/server/api/database.py b/server/api/database.py new file mode 100644 index 0000000..f0b13d6 --- /dev/null +++ b/server/api/database.py @@ -0,0 +1,3 @@ +from flask_sqlalchemy import SQLAlchemy + +db = SQLAlchemy() diff --git a/server/api/health/__init__.py b/server/api/health/__init__.py new file mode 100644 index 0000000..97f90f3 --- /dev/null +++ b/server/api/health/__init__.py @@ -0,0 +1 @@ +from .health import health_route diff --git a/server/api/health/health.py b/server/api/health/health.py new file mode 100644 index 0000000..187d52e --- /dev/null +++ b/server/api/health/health.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from flask import Blueprint, jsonify + +from server import opts +from server.health import run_all_functions + +health_route = Blueprint('health', __name__) + + +@health_route.route('/', methods=['GET']) +def health_check(): + uptime = str(datetime.now() - opts.start_time) + exit_code, api_results = run_all_functions() + + return jsonify({ + 'health': exit_code, + 'uptime': uptime, + 'api_results': api_results + }) diff --git a/server/api/job_tracker.py b/server/api/job_tracker.py new file mode 100644 index 0000000..7ffc5cd --- /dev/null +++ b/server/api/job_tracker.py @@ -0,0 +1,122 @@ +import copy +import datetime +import threading +import uuid +from multiprocessing import Manager +from multiprocessing.managers import BaseManager +from typing import Dict + +from server.mysql import query + + +class Job: + _manager = Manager() + + def __init__(self, list_id): + self._list_id = list_id + self._job_id = str(uuid.uuid4()) + self._status = 'running' + self._start_time = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e3) + self._end_time = None + self._success = None + + self._progresses: dict[str, dict] + self._manager = Manager() + self._progresses = self._manager.dict() + self._completed = self._manager.list() + + def success(self, s: bool): + self._success = s + + def id(self): + return self._job_id + + def list_id(self): + return self._list_id + + def status(self): + return self._status + + def start_time(self): + return self._start_time + + def end_time(self): + return self._end_time + + def new_progress_thread(self, video_id: str) -> dict: + self._progresses[video_id] = self._manager.dict({ + 'downloaded_bytes': -1, + 'percent': -1, + 'total': -1, + 'total_bytes': -1, + 'total_bytes_estimate': -1, + 'speed': -1, + 'size': -1, + 'start_time': 0 + }) + return self._progresses[video_id] + + def del_progress_thread(self, video_id: str): + del self._progresses[video_id] + + def progresses(self): + return copy.deepcopy(self._progresses) + + def add_completed(self, video_id: str): + self._completed.append(video_id) + + def finish(self): + self._status = 'finished' + self._end_time = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e3) + query("UPDATE `jobs` SET `result` = 'finished' WHERE `jobs`.`job_id` = %s", (self._job_id,)) + + def dict(self) -> dict: + return { + 'job_id': self._job_id, + 'list_id': self._list_id, + 'status': self._status, + 'start_time': self._start_time, + 'end_time': self._end_time, + 'progresses': copy.deepcopy({k: v for k, v in self._progresses.items()}), + 'completed': list(self._completed), + 'success': self._success + } + + +class JobManager(BaseManager): + pass + + +class JobTracker: + def __init__(self): + self._lock = threading.Lock() + self._jobs = {} + + def del_job(self, job_id): + del self._jobs[job_id] + + # def new_job(self, list_id: str): + # job_id = str(uuid.uuid4()) + # assert job_id not in self._jobs + # with self.lock: + # self._jobs[job_id] = Job(job_id, list_id) + # return self._jobs[job_id] + + def add_job(self, job: Job): + assert job.id() not in self._jobs + with self._lock: + self._jobs[job.id()] = job + + def finish_job(self, job_id): + with self._lock: + job = self._jobs.get(job_id) + if job: + job.finish() + + def get_job(self, job_id) -> Job: + with self._lock: + return self._jobs.get(job_id) + + @property + def jobs(self) -> Dict[str, Job]: + return self._jobs.copy() diff --git a/server/api/jobs/__init__.py b/server/api/jobs/__init__.py new file mode 100644 index 0000000..34cb553 --- /dev/null +++ b/server/api/jobs/__init__.py @@ -0,0 +1 @@ +from .jobs import job_route diff --git a/server/api/jobs/jobs.py b/server/api/jobs/jobs.py new file mode 100644 index 0000000..5726a11 --- /dev/null +++ b/server/api/jobs/jobs.py @@ -0,0 +1,139 @@ +import multiprocessing +from datetime import datetime +from multiprocessing import Process + +from flask import Blueprint, jsonify, request + +from .queue import job_queue, job_status, queued_jobs +from .. import shared +from ..job_tracker import Job, JobManager +from ..list.video_list import VideoList +from ... import opts +from ...helpers.misc import get_elapsed_time_from_ms +from ...mysql import query +from ...process.main import do_download + +job_route = Blueprint('job', __name__) + +JobManager.register('Job', Job) + + +@job_route.route('/start', methods=['POST']) +def submit_job(): + data = request.get_json(silent=True) + if not isinstance(data, dict) or not data: + return jsonify({'error': 'Data should be a key-value mapping.'}), 400 + + target_list = data.get('list') + ignore_downloaded = data.get('ignore-downloaded', False) + if not target_list: + return jsonify({'error': 'list parameter is required'}), 400 + + l = VideoList.query.filter((VideoList.name == target_list) | (VideoList.id == target_list)).first() + if not l: + return jsonify({'error': 'list not found'}), 400 + + running_lists = {v.list_id(): v for k, v in shared.global_job_tracker.jobs.items() if v.status() == 'running'} + if l.id in running_lists.keys(): + return jsonify({'error': 'job for list already running', 'job_id': running_lists[l.id].id()}), 409 + + manager = JobManager() + manager.start() + job = manager.Job(l.id) + shared.global_job_tracker.add_job(job) + + query('INSERT INTO `jobs` (`job_id`, `result`, `started`) VALUES (%s, %s, UNIX_TIMESTAMP())', (job.id(), 'running')) + + # Add the job to the queue and the list of queued jobs + job_queue.put((job, l.id, l.url, opts.base_output, ignore_downloaded)) + queued_jobs.append(job.id()) + + # Update the job status + job_status[job.id] = 'queued' + queued = True + + # Start a new process for each job if it's not already running + if job_queue.qsize() > 0 and not any(p.name == 'do_download' and p.is_alive() for p in multiprocessing.active_children()): + p = Process(target=do_download, name='do_download') + p.start() + queued = False + + message = 'Job queued' if queued else 'Job started' + status = 'started' if not queued else 'queued' + adjusted_queue_size = job_queue.qsize() - 1 if job_queue.qsize() > 0 else 0 + + return jsonify({'status': status, 'message': message, 'job_id': job.id(), 'queue_size': adjusted_queue_size}), 200 + + +@job_route.route('/status', methods=['GET']) +def job_get_status(): + job_id = request.args.get('id') + if not job_id: + return jsonify({'error': 'id parameter is required'}), 400 + + in_queue = job_id in queued_jobs + status = job_status.get(job_id) + if not status: + return jsonify({'error': 'Job not found'}), 400 + + job = shared.global_job_tracker.get_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 400 + + if job.status() == 'running': + elapsed_s = int(get_elapsed_time_from_ms(job.start_time()).total_seconds()) + else: + elapsed_s = int((datetime.fromtimestamp(job.end_time() / 1000.0) - datetime.fromtimestamp(job.start_time() / 1000.0)).total_seconds()) + + return jsonify({'in_queue': in_queue, 'job': job.dict(), 'elapsed': elapsed_s}), 200 + + +@job_route.route('/result', methods=['GET']) +def job_result(): + job_id = request.args.get('id') + if not job_id: + return jsonify({'error': 'id parameter is required'}), 400 + status_query = query('SELECT * FROM `jobs` WHERE `job_id`=%s', (job_id,), dictionary=True) + if len(status_query) > 1: + return jsonify({'error': 'multiple jobs for that ID'}), 500 + if len(status_query) == 0: + return jsonify({'error': 'no jobs for that ID'}), 400 + del status_query[0]['id'] + return jsonify(status_query[0]), 200 + + +@job_route.route('/log', methods=['GET']) +def job_log(): + job_id = request.args.get('id') + if not job_id: + return jsonify({'error': 'id parameter is required'}), 400 + job_query = query(''' + SELECT j.*, l.* + FROM `jobs` j + LEFT JOIN `logging_job_output` l ON j.job_id = l.job_id + WHERE j.job_id=%s + ''', (job_id,), dictionary=True) + if len(job_query) == 0: + return jsonify({'error': 'Job not found'}), 400 + if job_query[0]['level'] is None: + return jsonify({'error': 'No logs for this job'}), 400 + + result = [] + for line in job_query: + l = line.copy() + del l["job_id"] + del l["result"] + del l["started"] + del l['id'] + result.append(l) + return jsonify({ + 'items': result, + 'job_id': job_query[0]['job_id'], + 'result': job_query[0]['result'], + 'started': job_query[0]['started'] + }), 200 + + +@job_route.route('/active', methods=['GET']) +def job_job_status(): + return jsonify({'jobs': [k for k, v in shared.global_job_tracker.jobs.items() if v.status() == 'running'], 'queue': list(queued_jobs), 'queue_size': job_queue.qsize()}), 200 diff --git a/server/api/jobs/queue.py b/server/api/jobs/queue.py new file mode 100644 index 0000000..42da5da --- /dev/null +++ b/server/api/jobs/queue.py @@ -0,0 +1,6 @@ +from multiprocessing import Manager + +manager = Manager() +job_queue = manager.Queue() +job_status = manager.dict() +queued_jobs = manager.list() diff --git a/server/api/list/__init__.py b/server/api/list/__init__.py new file mode 100644 index 0000000..a41dfba --- /dev/null +++ b/server/api/list/__init__.py @@ -0,0 +1 @@ +from .lists import list_route diff --git a/server/api/list/lists.py b/server/api/list/lists.py new file mode 100644 index 0000000..03a92e6 --- /dev/null +++ b/server/api/list/lists.py @@ -0,0 +1,52 @@ +import re +from urllib.parse import urlparse + +from flask import Blueprint, jsonify, request +from yt_dlp import YoutubeDL + +from server.helpers.regex import url_regex +from .video_list import VideoList +from ..database import db +from ...mysql import query +from ...process.ytlogging import YtdlLogger + +list_route = Blueprint('lists', __name__) + + +@list_route.route('/add', methods=['POST']) +def add_list(): + data = request.get_json(silent=True) + if not isinstance(data, dict) or not data: + return jsonify({'error': 'Data should be a key-value mapping.'}), 400 + + url = data.get('url') + + # Check if it's a valid URL + if not url or not urlparse(url).scheme or not re.match(url_regex, url): + return jsonify({'error': 'Invalid URL'}), 400 + + # Check if it's a YouTube URL + if 'youtube.com' not in url: + return jsonify({'error': 'URL is not a YouTube URL'}), 400 + + # Check if the list is already in the database + existing_list = VideoList.query.filter_by(url=url).first() + if existing_list: + return jsonify({'added': False, 'message': 'List already in database', 'name': existing_list.name, 'url': existing_list.url, 'id': existing_list.id}), 200 + + # Use yt-dlp to get the playlist name + with YoutubeDL({'extract_flat': 'in_playlist', 'logger': YtdlLogger('logs')}) as ydl: + info_dict = ydl.extract_info(url, download=False) + playlist_name = info_dict.get('title', None) + + if not playlist_name: + return jsonify({'error': 'Could not get playlist name'}), 400 + + # Add the list to the database + new_list = VideoList(name=playlist_name, url=url) + db.session.add(new_list) + db.session.commit() + + list_id = query(f'SELECT * FROM `video_lists` WHERE `url`=%s', (url,)) + + return jsonify({'added': True, 'message': 'List added to database', 'name': playlist_name, 'url': url, 'id': list_id}), 201 diff --git a/server/api/list/video_list.py b/server/api/list/video_list.py new file mode 100644 index 0000000..d237a83 --- /dev/null +++ b/server/api/list/video_list.py @@ -0,0 +1,13 @@ +from sqlalchemy import text + +from ..database import db + + +class VideoList(db.Model): + __tablename__ = 'video_lists' + + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.Text, nullable=False) + url = db.Column(db.Text, nullable=False) + added = db.Column(db.DateTime, server_default=text('CURRENT_TIMESTAMP')) + last_ran = db.Column(db.DateTime) diff --git a/server/api/shared.py b/server/api/shared.py new file mode 100644 index 0000000..7f0c4ff --- /dev/null +++ b/server/api/shared.py @@ -0,0 +1,3 @@ +from server.api import JobTracker + +global_job_tracker: JobTracker diff --git a/server/background.py b/server/background.py new file mode 100644 index 0000000..23e7b24 --- /dev/null +++ b/server/background.py @@ -0,0 +1,37 @@ +import threading +import time +from datetime import datetime, timezone + +from server import opts +from server.api import shared +from server.mysql import db_logger, query + +logger = db_logger('BACKGROUND', 'logs') + + +def start(): + threading.Thread(target=check_dict).start() + threading.Thread(target=jobs_mysql).start() + + +def jobs_mysql(): + """ + Background thread that does all the stuff related to jobs and the database + """ + query("UPDATE `jobs` SET `result`='exited' where result='running'", ()) # Set old jobs to finished in case the last process died + # while True: + # for _, job in shared.global_job_tracker.jobs.items(): + # if job.status == 'finished': + # query("UPDATE `jobs` SET `result` = 'finished' WHERE `jobs`.`id` = %s", (job.id(),)) + # logger.debug(f"Marked job as done: {job.id()}") + # time.sleep(1) + + +def check_dict(): + while True: + now = int(datetime.now(timezone.utc).timestamp() * 1e3) + keys_to_delete = [key for key, j in shared.global_job_tracker.jobs.items() if j.end_time() and now - j.end_time() >= opts.jobs_cleanup_time * 60 * 1000] + for key in keys_to_delete: + shared.global_job_tracker.del_job(key) + logger.debug(f'Deleted old job status: {key}') + time.sleep(opts.jobs_cleanup_time) diff --git a/server/health.py b/server/health.py new file mode 100644 index 0000000..065eee0 --- /dev/null +++ b/server/health.py @@ -0,0 +1,18 @@ +def health_check_one() -> (bool, any): + return True, 'testing123' + + +def health_check_two() -> (bool, any): + return True, 'banana' + + +def run_all_functions(): + results = {} + exit_code = 'ok' + for name, func in globals().items(): + if callable(func) and func.__name__ != "run_all_functions": + success, data = func() + if not success: + exit_code = 'crit' + results[name] = data + return exit_code, results diff --git a/server/helpers/__init__.py b/server/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/helpers/misc.py b/server/helpers/misc.py new file mode 100644 index 0000000..f25846b --- /dev/null +++ b/server/helpers/misc.py @@ -0,0 +1,7 @@ +import datetime + + +def get_elapsed_time_from_ms(timestamp): + timestamp = datetime.datetime.fromtimestamp(timestamp / 1000.0) + current_time = datetime.datetime.now() + return current_time - timestamp diff --git a/server/helpers/regex.py b/server/helpers/regex.py new file mode 100644 index 0000000..97993e9 --- /dev/null +++ b/server/helpers/regex.py @@ -0,0 +1,14 @@ +import re + +url_regex = re.compile(r'^(?:http|ftp)s?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... + r'localhost|' # localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + +ansi_escape_regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + + +def strip_color_codes(s): + return re.sub(r'\x1b\[[0-9;]*m', '', s) diff --git a/server/logging.py b/server/logging.py new file mode 100644 index 0000000..998e6cf --- /dev/null +++ b/server/logging.py @@ -0,0 +1,42 @@ +import logging +import re +import threading + +# Universal formatter +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +# Create a thread-local object +local = threading.local() + + +# def new_db_connection(thread: bool = True): +# if opts.mysql['host']: +# if thread: +# # Check if a connection already exists for the current thread +# if not hasattr(local, 'db'): +# # Create a new connection if it doesn't exist +# local.db = mysql.connector.connect( +# host=opts.mysql['host'], +# user=opts.mysql['user'], +# password=opts.mysql['password'], +# database=opts.mysql['database'] +# ) +# return local.db +# else: +# return mysql.connector.connect( +# host=opts.mysql['host'], +# user=opts.mysql['user'], +# password=opts.mysql['password'], +# database=opts.mysql['database'] +# ) + + +def print_without_paths(msg): + """ + Remove any filepaths or other stuff we don't want in the message. + """ + m = re.match(r'(^[^/]+(?:\\.[^/]*)*)', msg) + if m: + msg = m.group(1) + m1 = re.match(r'^(.*?): ', msg) + return msg.strip('to "').strip('to: ').strip() diff --git a/server/mysql.py b/server/mysql.py new file mode 100644 index 0000000..380f759 --- /dev/null +++ b/server/mysql.py @@ -0,0 +1,207 @@ +import logging +import os +import re +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import mysql.connector +import mysql.connector +from mysql.connector import Error + +from server import opts +from .logging import formatter + +current_dir = Path(os.path.dirname(os.path.realpath(__file__))) + + +class DatabaseConnection: + def __init__(self, host=None, user=None, password=None, database=None): + if host: + self.host = host + else: + self.host = opts.mysql['host'] + if user: + self.user = user + else: + self.user = opts.mysql['user'] + if password: + self.password = password + else: + self.password = opts.mysql['password'] + if database: + self.database = database + else: + self.database = opts.mysql['database'] + self.connection = None + + def __enter__(self): + self.connection = mysql.connector.connect( + host=self.host, + user=self.user, + password=self.password, + database=self.database + ) + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + self.connection.close() + + +def test_mysql_connection() -> (bool, str): + conn = None + success = False + error = None + try: + with DatabaseConnection() as conn: + if conn.is_connected(): + cursor = conn.cursor() + cursor.execute("CREATE TEMPORARY TABLE test_table(id INT)") + cursor.execute("DROP TEMPORARY TABLE test_table") + success = True + except Error as e: + success = False + error = e + finally: + if conn: + conn.close() + return success, error + + +def init_db(): + sql_script = current_dir / 'sql' / 'database.sql' + log = logging.getLogger('MAIN') + die = False + with DatabaseConnection() as conn: + f = sql_script.read_text() + cursor = conn.cursor() + for statement in f.split(';'): + if statement.strip() != '': + try: + cursor.execute(statement) + except Exception as e: + log.fatal(f'failed to execute setup SQL. {e.__class__.__name__} - {e}') + die = True + if die: + log.fatal('The setup SQL failed to run. Please erase the existing tables and either re-run the program or execute the SQL script manually.') + quit(1) + conn.commit() + + +def check_if_database_exists(partial: bool = False): + # Get the tables that should be in the DB based on the creation SQL script + pattern = re.compile(r'^CREATE TABLE `(.*?)`$') + sql_script = current_dir / 'sql' / 'database.sql' + should_exist = [] + for i, line in enumerate(open(sql_script)): + for match in re.finditer(pattern, line): + should_exist.append(match.group(1)) + + with DatabaseConnection() as conn: + cursor = conn.cursor() + cursor.execute("show tables;") + result = cursor.fetchall() + if not len(result): + # No tables in DB + return False, should_exist + missing_tables = [] + if partial: + for s in should_exist: + found = False + for table in result: + t = table[0] + if s == t: + found = True + continue + if not found: + missing_tables.append(s) + return (len(missing_tables) == 0), missing_tables + + +def get_console_logger(logger: logging.Logger = None, debug: bool = False, stream_handler: bool = False): + """ + Sometimes we need a console logger. + You can pass your own logger to add a console handler to, or get a new one. + """ + if not logger: + logger = logging.getLogger('MAIN') + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + if stream_handler: + return console_handler + else: + logger.addHandler(console_handler) + return logger + + +def db_logger(name, table, job_id: str = None, level: int = None, console: bool = False): + """ + Log to the database and the console. + """ + logger = logging.getLogger(name) + if not level: + if opts.verbose: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + else: + logger.setLevel(level) + + # Database handler + db_handler = MySQLHandler(name, table, job_id) + db_handler.setFormatter(formatter) + logger.addHandler(db_handler) + + if console: + console_handler = get_console_logger(logger, opts.verbose, stream_handler=True) + logger.addHandler(console_handler) + return logger + + +class MySQLHandler(logging.Handler): + def __init__(self, name, table, job_id: str = None): + logging.Handler.__init__(self) + self.name = name + self.job_id = job_id + if table not in ['logs', 'jobs']: + raise ValueError(f'table value must be `logs` or `jobs`, not {table}') + self.table = table + self.executor = ThreadPoolExecutor(max_workers=5) + + def emit(self, record): + self.executor.submit(self._emit, record) + + def _emit(self, record): + with DatabaseConnection() as conn: + cursor = conn.cursor() + if self.table == 'logs': + cursor.execute( + "INSERT INTO logging_logs (level, name, time, message) VALUES (%s, %s, %i, %s)", + (self.name, record.levelname, record.created, record.getMessage()) + ) + elif self.table == 'jobs': + cursor.execute( + "INSERT INTO logging_job_output (job_id, name, level, time, message) VALUES (%s, %s, %s, %i, %s)", + (self.job_id, self.name, record.levelname, record.created, record.getMessage()) + ) + else: + raise ValueError + conn.commit() + + +def query(query_str: str, values: tuple, commit: bool = False, dictionary: bool = False): + with DatabaseConnection() as conn: + cursor = conn.cursor(dictionary=dictionary) + if values: + cursor.execute(query_str, values) + else: + cursor.execute(query_str) + if commit or query_str.startswith('INSERT') or query_str.startswith('UPDATE'): + conn.commit() + else: + return cursor.fetchall() diff --git a/server/opts.py b/server/opts.py new file mode 100644 index 0000000..fc0ff6a --- /dev/null +++ b/server/opts.py @@ -0,0 +1,21 @@ +import datetime +import os +import time + +base_output = None +log_dir = None +max_size = 1100 +ydlp_verbose = False +threads = os.cpu_count() - 1 +jobs_cleanup_time = 60 +verbose = False + +mysql = { + 'host': None, + 'user': None, + 'password': None, + 'database': None +} + +# Dynamic variables +start_time = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e3) diff --git a/server/process/__init__.py b/server/process/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/process/funcs.py b/server/process/funcs.py similarity index 94% rename from process/funcs.py rename to server/process/funcs.py index 10d8ede..fe44728 100644 --- a/process/funcs.py +++ b/server/process/funcs.py @@ -17,7 +17,7 @@ def restart_program(): for handler in p.open_files() + p.connections(): os.close(handler.fd) except Exception as e: - print('Could not restart Automated FBI Reporter after update.') + print('Could not restart process after update.') print(e) sys.exit(1) python = sys.executable @@ -79,6 +79,10 @@ def remove_duplicates_from_playlist(entries): return videos +def sanitize_colored_string(s): + return re.sub(r'\x1b\[[0-9;]*m', '', s) + + def remove_special_chars_linux(string, special_chars: list = None): if special_chars is None: special_chars = ['\\', '`', '*', '_', '{', '}', '[', ']', '(', ')', '>', '#', '+', '-', '.', '!', '$', '\''] diff --git a/server/process/main.py b/server/process/main.py new file mode 100644 index 0000000..de83394 --- /dev/null +++ b/server/process/main.py @@ -0,0 +1,131 @@ +import concurrent +import datetime +import traceback +from concurrent.futures import ProcessPoolExecutor +from pathlib import Path +from queue import Empty + +from server import opts +from server.api.jobs.queue import job_queue, job_status, queued_jobs +from server.mysql import db_logger +from server.process.funcs import remove_duplicates_from_playlist +from server.process.mysql import insert_video +from server.process.threads import download_video +from server.process.ytlogging import YtdlLogger +from ydl.yt_dlp import YDL + +# TODO: https://github.com/TheFrenchGhosty/TheFrenchGhostys-Ultimate-YouTube-DL-Scripts-Collection/blob/master/docs/Scripts-Type.md#archivist-scripts + +# https://github.com/yt-dlp/yt-dlp#embedding-examples +ydl_opts = { + # TODO: https://github.com/TheFrenchGhosty/TheFrenchGhostys-Ultimate-YouTube-DL-Scripts-Collection/blob/master/docs/Details.md + # https://old.reddit.com/r/DataHoarder/comments/c6fh4x/after_hoarding_over_50k_youtube_videos_here_is/ + 'format': f'(bestvideo[filesize<{opts.max_size}M][vcodec^=av01][height>=1080][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9.2][height>=1080][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9][height>=1080][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec^=av01][height>=1080]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9.2][height>=1080]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9][height>=1080]/bestvideo[filesize<{opts.max_size}M][height>=1080]/bestvideo[filesize<{opts.max_size}M][vcodec^=av01][height>=720][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9.2][height>=720][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9][height>=720][fps>30]/bestvideo[filesize<{opts.max_size}M][vcodec^=av01][height>=720]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9.2][height>=720]/bestvideo[filesize<{opts.max_size}M][vcodec=vp9][height>=720]/bestvideo[filesize<{opts.max_size}M][height>=720]/bestvideo[filesize<{opts.max_size}M])+(bestaudio[acodec=opus]/bestaudio)/best', + 'merge_output_format': 'mkv', + 'logtostderr': True, + 'embedchapters': True, + 'writethumbnail': True, + # Save the thumbnail to a file. Embedding seems to be broken right now so this is an alternative. + 'embedthumbnail': True, + 'embeddescription': True, + 'writesubtitles': True, + # 'allsubtitles': True, # Download every language. + 'subtitlesformat': 'vtt', + 'subtitleslangs': ['en'], + 'writeautomaticsub': True, + 'writedescription': True, + 'ignoreerrors': True, + 'continuedl': False, + 'addmetadata': True, + 'writeinfojson': True, + 'verbose': opts.ydlp_verbose, + 'postprocessors': [ + {'key': 'FFmpegEmbedSubtitle'}, + {'key': 'FFmpegMetadata', 'add_metadata': True}, + {'key': 'EmbedThumbnail', 'already_have_thumbnail': True}, + {'key': 'FFmpegThumbnailsConvertor', 'format': 'jpg', 'when': 'before_dl'}, + # {'key': 'FFmpegSubtitlesConvertor', 'format': 'srt'} + ], + # 'external_downloader': 'aria2c', + # 'external_downloader_args': ['-j 32', '-s 32', '-x 16', '--file-allocation=none', '--optimize-concurrent-downloads=true', '--http-accept-gzip=true', '--continue=true'], +} + + +def do_download(): + while True: + try: + # Get a job from the queue + job, l_id, url, base_output, ignore_downloaded = job_queue.get(timeout=5) + + # Remove the job from the list of queued jobs + queued_jobs.remove(job.id()) + + # Update the job status + job_status[job.id()] = 'running' + + start_time = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e3) + encountered_errors = 0 + logger = db_logger('DOWNLOADER', 'jobs', job_id=job.id()) + logger.info('Starting job') + + ydl = YDL(ydl_opts=dict(ydl_opts, **{'logger': YtdlLogger('DOWNLOADER', 'jobs', job.id())})) + playlist = ydl.playlist_contents(str(url)) + + if not playlist: + logger.fatal('URL is not a playlist!') + quit(1) + + playlist['entries'] = remove_duplicates_from_playlist(playlist['entries']) + + logger.info(f'Downloading item: "{playlist["title"]}" ({playlist["id"]}) {url}') + + download_queue = [] + for p, video in enumerate(playlist['entries']): + download_queue.append(video) + + playlist_ydl_opts = ydl_opts.copy() + + if len(download_queue): + with ProcessPoolExecutor(max_workers=opts.threads) as executor: + futures = {executor.submit(download_video, video, ydl_opts=playlist_ydl_opts, output_dir=Path(base_output), ignore_downloaded=ignore_downloaded, job=job) for video in download_queue} + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + error = False + + if result['downloaded_video_id']: + logger.info(result['downloaded_video_id']) + + for line in result['video_critical_err_msg_short']: + encountered_errors += 1 + error = True + logger.error(f"{result['video_id']} - {line}") + + for line in result['video_critical_err_msg']: + encountered_errors += 1 + error = True + logger.error(f"{result['video_id']} - {line}") + + for line in result['logger_msg']: + logger.info(f"{result['video_id']} - {line}") + + if not error: + insert_video(l_id, result['video_id'], result['url']) + except Exception as exc: + logger.error(f'Video download generated an exception: {exc}') + if encountered_errors == 0: + job.success(True) + else: + job.success(False) + job.finish() + + # Update the job status + job_status[job.id()] = 'finished' + job.finish() + print('======================================================= finished =============') + except Empty: + break + except Exception as e: + logger = db_logger(name='DOWNLOADER', table='logs', console=True) + logger.fatal(f'failed with {e.__class__.__name__}: {e}. {traceback.format_exc()}') + break diff --git a/server/process/mysql.py b/server/process/mysql.py new file mode 100644 index 0000000..8979f2a --- /dev/null +++ b/server/process/mysql.py @@ -0,0 +1,11 @@ +# from server.logging import new_db_connection + +# connection = new_db_connection() + + +def insert_video(list_id, name, url): + cursor = connection.cursor() + cursor.execute('INSERT INTO video_lists_content (list_id, name, url) VALUES (%s,%s,%s)', + (list_id, name, url)) + connection.commit() + cursor.close() diff --git a/server/process/threads.py b/server/process/threads.py new file mode 100644 index 0000000..5ac7554 --- /dev/null +++ b/server/process/threads.py @@ -0,0 +1,132 @@ +import datetime +import math +import subprocess +import time +import traceback +from pathlib import Path + +import yt_dlp as ydl_ydl +from hurry.filesize import size +from unidecode import unidecode + +import ydl.yt_dlp as ydl +from server.mysql import db_logger +from server.process.funcs import remove_special_chars_linux, sanitize_colored_string +from server.process.ytlogging import YtdlLogger + +name_max = int(subprocess.check_output("getconf NAME_MAX /", shell=True).decode()) - 30 + + +def download_video(video, ydl_opts, output_dir, ignore_downloaded, job) -> dict: + output_dict = {'downloaded_video_id': None, 'video_id': video['id'], 'video_url': video['url'], 'video_critical_err_msg': [], 'video_critical_err_msg_short': [], 'status_msg': [], 'logger_msg': []} # empty object + try: + job_progress = job.new_progress_thread(video['id']) + job_progress['start_time'] = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e3) + + def progress_hook(d): + if d['status'] == 'downloading': # Variables can be None if the download hasn't started yet. + if d.get('downloaded_bytes'): + # We want total_bytes but it may not exist so total_bytes_estimate is good too + if d.get('total_bytes'): + job_progress['total'] = d.get('total_bytes') + elif d.get('total_bytes_estimate'): + job_progress['total'] = d.get('total_bytes_estimate') + + if job_progress['total']: # If yt-dlp has this data + job_progress['downloaded_bytes'] = int(d['downloaded_bytes']) + if job_progress['total'] > 0: + job_progress['percent'] = (job_progress['downloaded_bytes'] / job_progress['total']) * 100 + # bar.update(int(np.round(percent - bar.n))) # If the progress bar doesn't end at 100% then round to 1 decimal place + job_progress['speed'] = sanitize_colored_string(d['_speed_str']).strip(' ') + job_progress['size'] = f"{size(d.get('downloaded_bytes'))}/{size(job_progress['total'])}" + # bar.set_postfix({ + # 'speed': d['_speed_str'], + # 'size': f"{size(d.get('downloaded_bytes'))}/{size(total)}", + # }) + else: # otherwise just use their internal variables + # bar.set_postfix({ + # 'speed': d['_speed_str'], + # 'size': f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}", + # }) + job_progress['speed'] = sanitize_colored_string(d['_speed_str']).strip(' ') + job_progress['size'] = f"{d['_downloaded_bytes_str'].strip()}/{d['_total_bytes_str'].strip()}" + + if not ignore_downloaded and not video['channel_id'] or not video['channel'] or not video['channel_url']: + if video['duration'] or isinstance(video['view_count'], int): + # Sometimes videos don't have channel_id, channel, or channel_url but are actually valid. Like shorts. + pass + else: + output_dict['video_critical_err_msg_short'].append('unavailable.') + return output_dict + + # Clean of forign languages + video['title'] = unidecode(video['title']) + + try: + # Get the video info + yt_dlp = ydl.YDL(dict(ydl_opts, **{'logger': YtdlLogger(name=video['id'], table='jobs', job_id=job.id)})) + video_n = yt_dlp.get_info(video['url']) + + if not video_n: + output_dict['video_critical_err_msg_short'].append('failed to get info. Unavailable?') + return output_dict + + video_n['url'] = video['url'] + video = video_n + del video_n + + # We created a new dict + video['title'] = unidecode(video['title']) + video['uploader'] = unidecode(video['uploader']) # now this info is present since we fetched it + + # TODO: do we also need to remove the @ char? + video_filename = remove_special_chars_linux( + ydl.get_output_templ(video_id=video['id'], title=video['title'], uploader=video['uploader'], uploader_id=video['uploader_id'], include_ext=False), special_chars=['/'] + ) + + # Make sure the video title isn't too long + while len(video_filename) >= name_max - 3: # -3 so that I can add ... + video['title'] = video['title'][:-1] + video_filename = remove_special_chars_linux( + ydl.get_output_templ( + video_id=video['id'], + title=video['title'] + '...', + uploader=video['uploader'], + uploader_id=video['uploader_id'], + include_ext=False + ), special_chars=['/']) + + base_path = str(Path(output_dir, video_filename)) + + ydl_opts['outtmpl'] = f"{base_path}.%(ext)s" + + # try: + # base_path = os.path.splitext(Path(output_dir, yt_dlp.prepare_filename(video)))[0] + # except AttributeError: + # # Sometimes we won't be able to pull the video info so just use the video's ID. + # base_path = output_dir / video['id'] + ylogger = YtdlLogger(name=video['id'], table='jobs', job_id=job.id) + ydl_opts['logger'] = ylogger + ydl_opts['progress_hooks'] = [progress_hook] + with ydl_ydl.YoutubeDL(ydl_opts) as y: + error_code = y.download(video['url']) + # yt_dlp = ydl.YDL(ydl_opts) # recreate the object with the correct logging path + # error_code = yt_dlp(video['url']) # Do the download + + if not error_code: + elapsed = round(math.ceil(time.time() - job_progress['start_time']) / 60, 2) + output_dict['logger_msg'].append(f"'{video['title']}' - Downloaded in {elapsed} min.") + output_dict['downloaded_video_id'] = video['id'] + else: + output_dict['video_critical_err_msg'] = output_dict['video_critical_err_msg'] + ylogger.errors + except Exception as e: + output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}") + logger = db_logger('DOWNLOADER', 'log', console=True) + logger.fatal(f'failed with {e.__class__.__name__}: {e}. {traceback.format_exc()}') + job.del_progress_thread(video['id']) + job.add_completed(video['id']) + return output_dict + except Exception as e: + output_dict['video_critical_err_msg'].append(f"EXCEPTION -> {traceback.format_exc()}") + logger = db_logger('DOWNLOADER', 'logs', console=True) + logger.fatal(f'failed with {e.__class__.__name__}: {e}. {traceback.format_exc()}') diff --git a/server/process/ytlogging.py b/server/process/ytlogging.py new file mode 100644 index 0000000..e97af95 --- /dev/null +++ b/server/process/ytlogging.py @@ -0,0 +1,30 @@ +import logging + +from server.helpers.regex import strip_color_codes +from server.mysql import db_logger + + +class YtdlLogger(object): + logger = None + errors = [] + + def __init__(self, name: str, table, job_id: str = None): + self.logger = db_logger(name, table, job_id=job_id) + self.logger.setLevel(logging.DEBUG) + + def debug(self, msg): + # print(msg) + self.logger.info(strip_color_codes(msg)) + + def info(self, msg): + # print(msg) + self.logger.info(strip_color_codes(msg)) + + def warning(self, msg): + # print(msg) + self.logger.warning(strip_color_codes(msg)) + + def error(self, msg): + # print(msg) + self.logger.error(strip_color_codes(msg)) + self.errors.append(strip_color_codes(msg)) diff --git a/server/sql/database.sql b/server/sql/database.sql new file mode 100644 index 0000000..a78bef2 --- /dev/null +++ b/server/sql/database.sql @@ -0,0 +1,152 @@ +-- phpMyAdmin SQL Dump +-- version 5.1.1deb5ubuntu1 +-- https://www.phpmyadmin.net/ +-- +-- Host: localhost:3306 +-- Generation Time: Jul 15, 2023 at 02:56 PM +-- Server version: 10.11.3-MariaDB-1:10.11.3+maria~ubu2204 +-- PHP Version: 8.1.2-1ubuntu2.11 + +SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO"; +START TRANSACTION; +SET time_zone = "+00:00"; + + +/*!40101 SET @OLD_CHARACTER_SET_CLIENT = @@CHARACTER_SET_CLIENT */; +/*!40101 SET @OLD_CHARACTER_SET_RESULTS = @@CHARACTER_SET_RESULTS */; +/*!40101 SET @OLD_COLLATION_CONNECTION = @@COLLATION_CONNECTION */; +/*!40101 SET NAMES utf8mb4 */; + +-- +-- Database: `automated_ytdlp` +-- + +-- -------------------------------------------------------- + +-- +-- Table structure for table `logging_job_output` +-- + +CREATE TABLE `logging_job_output` +( + `job_id` text NOT NULL, + `level` text NOT NULL, + `time` datetime NOT NULL DEFAULT current_timestamp(), + `message` longtext NOT NULL, + `id` int(11) NOT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `logging_logs` +-- + +CREATE TABLE `logging_logs` +( + `level` text NOT NULL, + `time` datetime NOT NULL DEFAULT current_timestamp(), + `message` longtext NOT NULL, + `id` int(11) NOT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `video_lists` +-- + +CREATE TABLE `video_lists` +( + `name` text NOT NULL, + `url` text NOT NULL, + `added` datetime NOT NULL DEFAULT current_timestamp(), + `last_ran` datetime DEFAULT NULL, + `id` int(11) NOT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `video_lists_content` +-- + +CREATE TABLE `video_lists_content` +( + `list_id` int(11) NOT NULL, + `video_id` text NOT NULL, + `name` text NOT NULL, + `url` text NOT NULL, + `date_added` datetime NOT NULL DEFAULT current_timestamp(), + `id` int(11) NOT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; + +-- +-- Indexes for dumped tables +-- + +-- +-- Indexes for table `logging_job_output` +-- +ALTER TABLE `logging_job_output` + ADD PRIMARY KEY (`id`); + +-- +-- Indexes for table `logging_logs` +-- +ALTER TABLE `logging_logs` + ADD PRIMARY KEY (`id`); + +-- +-- Indexes for table `video_lists` +-- +ALTER TABLE `video_lists` + ADD PRIMARY KEY (`id`); + +-- +-- Indexes for table `video_lists_content` +-- +ALTER TABLE `video_lists_content` + ADD PRIMARY KEY (`id`), + ADD UNIQUE KEY `video_id` (`video_id`) USING HASH; + +-- +-- AUTO_INCREMENT for dumped tables +-- + +-- +-- AUTO_INCREMENT for table `logging_job_output` +-- +ALTER TABLE `logging_job_output` + MODIFY `id` int(11) NOT NULL AUTO_INCREMENT; + +-- +-- AUTO_INCREMENT for table `logging_logs` +-- +ALTER TABLE `logging_logs` + MODIFY `id` int(11) NOT NULL AUTO_INCREMENT; + +-- +-- AUTO_INCREMENT for table `video_lists` +-- +ALTER TABLE `video_lists` + MODIFY `id` int(11) NOT NULL AUTO_INCREMENT; + +-- +-- AUTO_INCREMENT for table `video_lists_content` +-- +ALTER TABLE `video_lists_content` + MODIFY `id` int(11) NOT NULL AUTO_INCREMENT; +COMMIT; + +/*!40101 SET CHARACTER_SET_CLIENT = @OLD_CHARACTER_SET_CLIENT */; +/*!40101 SET CHARACTER_SET_RESULTS = @OLD_CHARACTER_SET_RESULTS */; +/*!40101 SET COLLATION_CONNECTION = @OLD_COLLATION_CONNECTION */; \ No newline at end of file diff --git a/ydl/yt_dlp.py b/ydl/yt_dlp.py index 28e7546..a912e94 100644 --- a/ydl/yt_dlp.py +++ b/ydl/yt_dlp.py @@ -5,6 +5,18 @@ from typing import Union import yt_dlp from mergedeep import merge +from server.process.funcs import restart_program + + +def do_update(): + print('Updating yt-dlp...') + updated = update_ytdlp() + if updated: + print('Restarting program...') + restart_program() + else: + print('Up to date.') + class YDL: def __init__(self, ydl_opts: dict = None, extra_ydlp_opts: dict = None): @@ -108,7 +120,7 @@ def update_ytdlp(): ) if f"Successfully installed {package_name}" in result.stdout: - # print(f"{package_name} was updated.") + # print(f"{package_name} was updated.") return True else: # print(f"{package_name} was not updated.")