better handling of (very) long playlists

- playlist download code in app.py (for URLs) and in playlist.py (for searches) unified
- new class SongInfo instead of using a tuple
- Fetch info of all songs before downloading a playlist to drastically reduce requests sent and to check if a song should be downloaded much faster
- Display skipped song messages after the download is finished to reduce spam when resuming a download
This commit is contained in:
AraneusRota 2023-01-05 21:39:30 +01:00
parent a38ea10ba3
commit 6a2b3b37d6
5 changed files with 185 additions and 101 deletions

View File

@ -5,8 +5,7 @@ from pathlib import Path
from zotify.album import download_album, download_artist_albums
from zotify.const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME, TYPE
from zotify.loader import Loader
from zotify.playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from zotify.playlist import download_from_user_playlist, download_playlist
from zotify.podcast import download_episode, get_show_episodes
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_saved_tracks, get_followed_artists
@ -100,26 +99,7 @@ def download_from_urls(urls: list[str]) -> bool:
download_album(album_id)
elif playlist_id is not None:
download = True
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
enum = 1
char_num = len(str(len(playlist_songs)))
for song in playlist_songs:
if not song[TRACK][NAME] or not song[TRACK][ID]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ANYMORE ###' + "\n")
else:
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song[TRACK][ID])
else:
download_track('playlist', song[TRACK][ID], extra_keys=
{
'playlist_song_name': song[TRACK][NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_id': playlist_id,
'playlist_track_id': song[TRACK][ID]
})
enum += 1
download_playlist(playlist_id, 'playlist')
elif episode_id is not None:
download = True
download_episode(episode_id)
@ -307,4 +287,4 @@ def search(search_term):
elif dic['type'] == ARTIST:
download_artist_albums(dic[ID])
else:
download_playlist(dic)
download_playlist(dic[ID], 'extplaylist')

View File

@ -1,7 +1,8 @@
from zotify.const import ITEMS, ID, TRACK, NAME
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import split_input
from zotify.const import ITEMS, ID, TRACK, NAME, TYPE
from zotify.podcast import download_episode
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_song_infos
from zotify.utils import split_input, get_previously_downloaded
from zotify.zotify import Zotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
@ -46,16 +47,50 @@ def get_playlist_info(playlist_id):
return resp['name'].strip(), resp['owner']['display_name'].strip()
def download_playlist(playlist):
def download_playlist(playlist_id, mode: str):
"""Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]]
playlist_name, _ = get_playlist_info(playlist_id)
playlist_songs = [song for song in get_playlist_songs(playlist_id) if song[TRACK][ID]]
previously_downloaded_song_ids = get_previously_downloaded()
playlist_song_infos = get_song_infos([song[TRACK][ID] for song in playlist_songs])
playlist_num_max_digits = len(str(len(playlist_songs)))
p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
enum = 1
for song in p_bar:
download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME])
enum += 1
previously_downloaded = 0
skip_messages_to_print = []
for i, (song, info) in enumerate(zip(p_bar, playlist_song_infos)):
song_name = song[TRACK][NAME]
song_id = song[TRACK][ID]
if not song_name or not song_id:
skip_messages_to_print.append('### SKIPPED: A SONG DOES NOT EXIST ANYMORE ###')
elif not info.is_playable:
skip_messages_to_print.append(f'### SKIPPED: {song_name} (SONG IS UNAVAILABLE) ###')
# Use ID from song info because ID from playlist is maybe relinked and therefore not in download archive (https://developer.spotify.com/documentation/general/guides/track-relinking-guide/)
elif info.scraped_song_id in previously_downloaded_song_ids and Zotify.CONFIG.get_skip_previously_downloaded():
previously_downloaded += 1
else:
p_bar.set_description(song_name)
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song_id)
else:
download_track(
mode,
song_id,
extra_keys={
'playlist_song_name': song_name,
'playlist': playlist_name,
'playlist_num': str(i + 1).zfill(playlist_num_max_digits),
'playlist_id': playlist_id,
'playlist_track_id': song_id
},
disable_progressbar=True,
pre_fetched_song_info=info)
if previously_downloaded > 0:
Printer.print(PrintChannel.SKIPS, f'### SKIPPED: {previously_downloaded} songs (ALREADY DOWNLOADED ONCE) ###')
for skip_message in skip_messages_to_print:
Printer.print(PrintChannel.SKIPS, skip_message)
def download_from_user_playlist():
@ -78,6 +113,6 @@ def download_from_user_playlist():
for playlist_number in playlist_choices:
playlist = playlists[playlist_number - 1]
print(f'Downloading {playlist[NAME].strip()}')
download_playlist(playlist)
download_playlist(playlist, 'extplaylist')
print('\n**All playlists have been downloaded**\n')

35
zotify/song_info.py Normal file
View File

@ -0,0 +1,35 @@
from typing import List, Any
class SongInfo:
def __init__(self,
artists: List[str],
raw_artists: List[Any],
album_artists: List[str],
album_name: str,
name: str,
image_url: str,
release_date: str,
release_year: str,
disc_number: int,
track_number: int,
total_tracks: int,
scraped_song_id: str,
isrc: str,
is_playable: bool,
duration_ms: int):
self.artists = artists
self.raw_artists = raw_artists
self.album_artists = album_artists
self.album_name = album_name
self.name = name
self.image_url = image_url
self.release_date = release_date
self.release_year = release_year
self.disc_number = disc_number
self.track_number = track_number
self.total_tracks = total_tracks
self.scraped_song_id = scraped_song_id
self.isrc = isrc
self.is_playable = is_playable
self.duration_ms = duration_ms

View File

@ -1,10 +1,10 @@
from functools import reduce
from itertools import islice, chain
from pathlib import Path, PurePath
import math
import re
import time
import uuid
from typing import Any, Tuple, List
from typing import Any, Tuple, List, Iterable
from librespot.metadata import TrackId
import ffmpy
@ -12,6 +12,7 @@ import ffmpy
from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \
HREF, ARTISTS, WIDTH, TOTAL_TRACKS, EXTERNAL_IDS, ISRC
from zotify.song_info import SongInfo
from zotify.termoutput import Printer, PrintChannel
from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
@ -43,45 +44,77 @@ def get_followed_artists() -> list:
resp = Zotify.invoke_url(FOLLOWED_ARTISTS_URL)[1]
for artist in resp[ARTISTS][ITEMS]:
artists.append(artist[ID])
return artists
def get_song_info(song_id) -> Tuple[List[str], List[Any], List[str], str, str, str, str, str, int, int, int, str, str, bool, int]:
""" Retrieves metadata for downloaded songs """
def get_song_info(song_id: str) -> SongInfo:
with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
(raw, info) = Zotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
return get_song_infos([song_id], disable_progressbar=True)[0]
if TRACKS not in info:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
try:
def names(raw_artists: List[Any]) -> List[str]:
return [a[NAME] for a in raw_artists]
def get_song_infos(song_ids: List[str], disable_progressbar=False) -> List[SongInfo]:
"""
Retrieves metadata for downloaded songs.
Input and output list are same length (and order) because Spotify always returns a result for all IDs (null if not available)
"""
track = info[TRACKS][0]
album = track[ALBUM]
release_date = album[RELEASE_DATE]
release_year = release_date.split('-')[0]
max_image = max(album[IMAGES], key=lambda i: i[WIDTH])
# Limit of 50 is defined by Spotify API (https://developer.spotify.com/documentation/web-api/reference/#/operations/get-several-tracks)
tracks_request_limit = 50
song_ids_iter = iter(song_ids)
return names(track[ARTISTS]), \
track[ARTISTS], \
names(album[ARTISTS]), \
album[NAME], \
track[NAME], \
max_image[URL], \
release_date, \
release_year, \
track[DISC_NUMBER], \
track[TRACK_NUMBER], \
album[TOTAL_TRACKS], \
track[ID], \
track[EXTERNAL_IDS][ISRC], \
track[IS_PLAYABLE], \
track[DURATION_MS]
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
def song_ids_next_chunk() -> List[str]:
return list(islice(song_ids_iter, tracks_request_limit))
song_id_chunks = list(iter(song_ids_next_chunk, []))
song_id_chunks_comma_delimited = [','.join(chunk) for chunk in song_id_chunks]
song_id_chunks_with_progress = Printer.progress(
song_id_chunks_comma_delimited,
unit='batches',
total=len(song_id_chunks_comma_delimited),
unit_scale=True,
disable=disable_progressbar)
song_id_chunks_with_progress.set_description(f'Fetching track info (batch size: {tracks_request_limit})')
def request_tracks(song_id_chunk_comma_delimited: str) -> Tuple[Any, Any]:
return Zotify.invoke_url(f'{TRACKS_URL}?ids={song_id_chunk_comma_delimited}&market=from_token')
chunk_responses = [request_tracks(chunk) for chunk in song_id_chunks_with_progress]
def chunk_info(raw_response, tracks_response) -> List[SongInfo]:
if TRACKS not in tracks_response:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw_response}')
def track_info(track) -> SongInfo:
try:
def names(raw_artists: List[Any]) -> List[str]:
return [a[NAME] for a in raw_artists]
album = track[ALBUM]
release_date = album[RELEASE_DATE]
release_year = release_date.split('-')[0]
max_image = max(album[IMAGES], key=lambda i: i[WIDTH])
return SongInfo(
names(track[ARTISTS]),
track[ARTISTS],
names(album[ARTISTS]),
album[NAME],
track[NAME],
max_image[URL],
release_date,
release_year,
track[DISC_NUMBER],
track[TRACK_NUMBER],
album[TOTAL_TRACKS],
track[ID],
track[EXTERNAL_IDS][ISRC],
track[IS_PLAYABLE],
track[DURATION_MS])
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw_response}')
return [track_info(t) for t in tracks_response[TRACKS]]
return list(
chain.from_iterable(
[chunk_info(raw, tracks) for raw, tracks in chunk_responses]))
def get_song_genres(rawartists: List[str], track_name: str) -> List[str]:
@ -147,7 +180,7 @@ def get_song_duration(song_id: str) -> float:
return duration
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None:
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False, pre_fetched_song_info: SongInfo = None) -> None:
""" Downloads raw song audio from Spotify """
if extra_keys is None:
@ -158,11 +191,12 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
try:
output_template = Zotify.CONFIG.get_output(mode)
info = get_song_info(track_id) if pre_fetched_song_info is None else pre_fetched_song_info
song_name = fix_filename(info.artists[0]) + ' - ' + fix_filename(info.name)
(artists, raw_artists, album_artists, album_name, name, image_url, release_date, release_year, disc_number,
track_number, total_tracks, scraped_song_id, isrc, is_playable, duration_ms) = get_song_info(track_id)
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
_track_number = str(info.track_number).zfill(2)
if 'multi_disc' in extra_keys and extra_keys['multi_disc']:
_track_number = f'{info.disc_number}{_track_number}'
for k in extra_keys:
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
@ -170,19 +204,19 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower())
output_template = output_template \
.replace("{artist}", fix_filename(artists[0])) \
.replace("{artists}", fix_filename(", ".join(artists))) \
.replace("{album_artist}", fix_filename(album_artists[0])) \
.replace("{album_artists}", fix_filename(", ".join(album_artists))) \
.replace("{album}", fix_filename(album_name)) \
.replace("{song_name}", fix_filename(name)) \
.replace("{release_date}", fix_filename(release_date)) \
.replace("{release_year}", fix_filename(release_year)) \
.replace("{disc_number}", fix_filename(disc_number)) \
.replace("{track_number}", fix_filename(track_number)) \
.replace("{total_tracks}", fix_filename(total_tracks)) \
.replace("{id}", fix_filename(scraped_song_id)) \
.replace("{isrc}", fix_filename(isrc)) \
.replace("{artist}", fix_filename(info.artists[0])) \
.replace("{artists}", fix_filename(", ".join(info.artists))) \
.replace("{album_artist}", fix_filename(info.album_artists[0])) \
.replace("{album_artists}", fix_filename(", ".join(info.album_artists))) \
.replace("{album}", fix_filename(info.album_name)) \
.replace("{song_name}", fix_filename(info.name)) \
.replace("{release_date}", fix_filename(info.release_date)) \
.replace("{release_year}", fix_filename(info.release_year)) \
.replace("{disc_number}", fix_filename(info.disc_number)) \
.replace("{track_number}", fix_filename(info.track_number)) \
.replace("{total_tracks}", fix_filename(info.total_tracks)) \
.replace("{id}", fix_filename(info.scraped_song_id)) \
.replace("{isrc}", fix_filename(info.isrc)) \
.replace("{track_id}", fix_filename(track_id)) \
.replace("{ext}", ext)
@ -194,8 +228,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = Path(filename).is_file() and Path(filename).stat().st_size
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
check_id = info.scraped_song_id in get_directory_song_ids(filedir)
check_all_time = info.scraped_song_id in get_previously_downloaded()
# a song with the same name is installed
if not check_id and check_name:
@ -217,7 +251,7 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
else:
try:
if not is_playable:
if not info.is_playable:
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
else:
@ -230,8 +264,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
else:
if track_id != scraped_song_id:
track_id = scraped_song_id
if track_id != info.scraped_song_id:
track_id = info.scraped_song_id
track = TrackId.from_base62(track_id)
stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
@ -258,13 +292,13 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
b += 1 if data == b'' else 0
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
delta_want = (downloaded / total_size) * (info.duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
time_downloaded = time.time()
genres = get_song_genres(raw_artists, name)
genres = get_song_genres(info.raw_artists, info.name)
if(Zotify.CONFIG.get_download_lyrics()):
try:
@ -273,8 +307,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.SKIPS, f"### Skipping lyrics for {song_name}: lyrics not available ###")
convert_audio_format(filename_temp)
try:
set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename_temp, image_url)
set_audio_tags(filename_temp, info.artists, genres, info.name, info.album_name, info.release_year, info.disc_number, info.track_number)
set_music_thumbnail(filename_temp, info.image_url)
except Exception:
Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.")
@ -287,10 +321,10 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
# add song id to archive file
if Zotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name)
add_to_archive(info.scraped_song_id, PurePath(filename).name, info.artists[0], info.name)
# add song id to download directory's .song_ids file
if not check_id:
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name)
add_to_directory_song_ids(filedir, info.scraped_song_id, PurePath(filename).name, info.artists[0], info.name)
if not Zotify.CONFIG.get_bulk_wait_time():
time.sleep(Zotify.CONFIG.get_bulk_wait_time())

View File

@ -6,7 +6,7 @@ import re
import subprocess
from enum import Enum
from pathlib import Path, PurePath
from typing import List, Tuple
from typing import List, Tuple, Set
import music_tag
import requests
@ -32,15 +32,15 @@ def create_download_directory(download_path: str) -> None:
pass
def get_previously_downloaded() -> List[str]:
def get_previously_downloaded() -> Set[str]:
""" Returns list of all time downloaded songs """
ids = []
ids = {}
archive_path = Zotify.CONFIG.get_song_archive()
if Path(archive_path).exists():
with open(archive_path, 'r', encoding='utf-8') as f:
ids = [line.strip().split('\t')[0] for line in f.readlines()]
ids = {line.strip().split('\t')[0] for line in f.readlines()}
return ids