Merge branch 'better-long-playlists' into 'main'

Better handling of (very) long playlists

See merge request team-zotify/zotify!13
This commit is contained in:
AraneusRota 2024-02-12 20:55:45 +00:00
commit fd94b2bb55
7 changed files with 211 additions and 102 deletions

View File

@ -105,15 +105,21 @@ The value is relative to the `ROOT_PATH`/`ROOT_PODCAST_PATH` directory and can c
| Placeholder | Description
|-----------------|--------------------------------
| {artist} | The song artist
| {album} | The song album
| {song_name} | The song name
| {release_year} | The song release year
| {disc_number} | The disc number
| {track_number} | The track_number
| {id} | The song id
| {track_id} | The track id
| {ext} | The file extension
| {artist} | Main song artist
| {artists} | All song artists (comma delimited)
| {album_artist} | Main album artist
| {album_artists} | All album artists (comma delimited)
| {album} | Song album name
| {song_name} | Song name
| {release_date} | Song/Album release date as precise as available (YYYY-MM-DD, YYYY-MM or YYYY)
| {release_year} | Song/Album release year
| {disc_number} | Disc number
| {track_number} | Track number
| {total_tracks} | Total tracks on album
| {id} | Song id
| {track_id} | Track id
| {isrc} | Song [ISRC](https://en.m.wikipedia.org/wiki/International_Standard_Recording_Code)
| {ext} | File extension
| {album_id} | (only when downloading albums) ID of the album
| {album_num} | (only when downloading albums) Incrementing track number
| {playlist} | (only when downloading playlists) Name of the playlist

View File

@ -5,8 +5,7 @@ from pathlib import Path
from zotify.album import download_album, download_artist_albums
from zotify.const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME, TYPE
from zotify.loader import Loader
from zotify.playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from zotify.playlist import download_from_user_playlist, download_playlist
from zotify.podcast import download_episode, get_show_episodes
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_saved_tracks, get_followed_artists
@ -100,26 +99,7 @@ def download_from_urls(urls: list[str]) -> bool:
download_album(album_id)
elif playlist_id is not None:
download = True
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
enum = 1
char_num = len(str(len(playlist_songs)))
for song in playlist_songs:
if not song[TRACK][NAME] or not song[TRACK][ID]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ANYMORE ###' + "\n")
else:
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song[TRACK][ID])
else:
download_track('playlist', song[TRACK][ID], extra_keys=
{
'playlist_song_name': song[TRACK][NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_id': playlist_id,
'playlist_track_id': song[TRACK][ID]
})
enum += 1
download_playlist(playlist_id, 'playlist')
elif episode_id is not None:
download = True
download_episode(episode_id)
@ -307,4 +287,4 @@ def search(search_term):
elif dic['type'] == ARTIST:
download_artist_albums(dic[ID])
else:
download_playlist(dic)
download_playlist(dic[ID], 'extplaylist')

View File

@ -40,10 +40,16 @@ HREF = 'href'
ID = 'id'
EXTERNAL_IDS = 'external_ids'
ISRC = 'isrc'
URL = 'url'
RELEASE_DATE = 'release_date'
TOTAL_TRACKS = 'total_tracks'
IMAGES = 'images'
LIMIT = 'limit'

View File

@ -1,7 +1,8 @@
from zotify.const import ITEMS, ID, TRACK, NAME
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import split_input
from zotify.const import ITEMS, ID, TRACK, NAME, TYPE
from zotify.podcast import download_episode
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_song_infos
from zotify.utils import split_input, get_previously_downloaded
from zotify.zotify import Zotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
@ -46,16 +47,50 @@ def get_playlist_info(playlist_id):
return resp['name'].strip(), resp['owner']['display_name'].strip()
def download_playlist(playlist):
def download_playlist(playlist_id, mode: str):
"""Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]]
playlist_name, _ = get_playlist_info(playlist_id)
playlist_songs = [song for song in get_playlist_songs(playlist_id) if song[TRACK][ID]]
previously_downloaded_song_ids = get_previously_downloaded()
playlist_song_infos = get_song_infos([song[TRACK][ID] for song in playlist_songs])
playlist_num_max_digits = len(str(len(playlist_songs)))
p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
enum = 1
for song in p_bar:
download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME])
enum += 1
previously_downloaded = 0
skip_messages_to_print = []
for i, (song, info) in enumerate(zip(p_bar, playlist_song_infos)):
song_name = song[TRACK][NAME]
song_id = song[TRACK][ID]
if not song_name or not song_id:
skip_messages_to_print.append('### SKIPPED: A SONG DOES NOT EXIST ANYMORE ###')
elif not info.is_playable:
skip_messages_to_print.append(f'### SKIPPED: {song_name} (SONG IS UNAVAILABLE) ###')
# Use ID from song info because ID from playlist is maybe relinked and therefore not in download archive (https://developer.spotify.com/documentation/general/guides/track-relinking-guide/)
elif info.scraped_song_id in previously_downloaded_song_ids and Zotify.CONFIG.get_skip_previously_downloaded():
previously_downloaded += 1
else:
p_bar.set_description(song_name)
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song_id)
else:
download_track(
mode,
song_id,
extra_keys={
'playlist_song_name': song_name,
'playlist': playlist_name,
'playlist_num': str(i + 1).zfill(playlist_num_max_digits),
'playlist_id': playlist_id,
'playlist_track_id': song_id
},
disable_progressbar=True,
pre_fetched_song_info=info)
if previously_downloaded > 0:
Printer.print(PrintChannel.SKIPS, f'### SKIPPED: {previously_downloaded} songs (ALREADY DOWNLOADED ONCE) ###')
for skip_message in skip_messages_to_print:
Printer.print(PrintChannel.SKIPS, skip_message)
def download_from_user_playlist():
@ -78,6 +113,6 @@ def download_from_user_playlist():
for playlist_number in playlist_choices:
playlist = playlists[playlist_number - 1]
print(f'Downloading {playlist[NAME].strip()}')
download_playlist(playlist)
download_playlist(playlist, 'extplaylist')
print('\n**All playlists have been downloaded**\n')

35
zotify/song_info.py Normal file
View File

@ -0,0 +1,35 @@
from typing import List, Any
class SongInfo:
def __init__(self,
artists: List[str],
raw_artists: List[Any],
album_artists: List[str],
album_name: str,
name: str,
image_url: str,
release_date: str,
release_year: str,
disc_number: int,
track_number: int,
total_tracks: int,
scraped_song_id: str,
isrc: str,
is_playable: bool,
duration_ms: int):
self.artists = artists
self.raw_artists = raw_artists
self.album_artists = album_artists
self.album_name = album_name
self.name = name
self.image_url = image_url
self.release_date = release_date
self.release_year = release_year
self.disc_number = disc_number
self.track_number = track_number
self.total_tracks = total_tracks
self.scraped_song_id = scraped_song_id
self.isrc = isrc
self.is_playable = is_playable
self.duration_ms = duration_ms

View File

@ -1,16 +1,18 @@
from itertools import islice, chain
from pathlib import Path, PurePath
import math
import re
import time
import uuid
from typing import Any, Tuple, List
from typing import Any, Tuple, List, Iterable
from librespot.metadata import TrackId
import ffmpy
from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \
HREF, ARTISTS, WIDTH
HREF, ARTISTS, WIDTH, TOTAL_TRACKS, EXTERNAL_IDS, ISRC
from zotify.song_info import SongInfo
from zotify.termoutput import Printer, PrintChannel
from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
@ -42,41 +44,78 @@ def get_followed_artists() -> list:
resp = Zotify.invoke_url(FOLLOWED_ARTISTS_URL)[1]
for artist in resp[ARTISTS][ITEMS]:
artists.append(artist[ID])
return artists
def get_song_info(song_id) -> Tuple[List[str], List[Any], str, str, Any, Any, Any, Any, Any, Any, int]:
""" Retrieves metadata for downloaded songs """
def get_song_info(song_id: str) -> SongInfo:
with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
(raw, info) = Zotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
return get_song_infos([song_id], disable_progressbar=True)[0]
if not TRACKS in info:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
try:
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
def get_song_infos(song_ids: List[str], disable_progressbar=False) -> List[SongInfo]:
"""
Retrieves metadata for downloaded songs.
Input and output list are same length (and order) because Spotify always returns a result for all IDs (null if not available)
"""
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
# Limit of 50 is defined by Spotify API (https://developer.spotify.com/documentation/web-api/reference/#/operations/get-several-tracks)
tracks_request_limit = 50
song_ids_iter = iter(song_ids)
image = info[TRACKS][0][ALBUM][IMAGES][0]
for i in info[TRACKS][0][ALBUM][IMAGES]:
if i[WIDTH] > image[WIDTH]:
image = i
image_url = image[URL]
def song_ids_next_chunk() -> List[str]:
return list(islice(song_ids_iter, tracks_request_limit))
song_id_chunks = list(iter(song_ids_next_chunk, []))
song_id_chunks_comma_delimited = [','.join(chunk) for chunk in song_id_chunks]
song_id_chunks_with_progress = Printer.progress(
song_id_chunks_comma_delimited,
unit='batches',
total=len(song_id_chunks_comma_delimited),
unit_scale=True,
disable=disable_progressbar)
song_id_chunks_with_progress.set_description(f'Fetching track info (batch size: {tracks_request_limit})')
return artists, info[TRACKS][0][ARTISTS], album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
def request_tracks(song_id_chunk_comma_delimited: str):
raw_response, tracks_response = \
Zotify.invoke_url(f'{TRACKS_URL}?ids={song_id_chunk_comma_delimited}&market=from_token')
if TRACKS not in tracks_response:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw_response}')
return raw_response, tracks_response
chunk_responses = [request_tracks(chunk) for chunk in song_id_chunks_with_progress]
def chunk_info(raw_response, tracks_response) -> List[SongInfo]:
def track_info(track) -> SongInfo:
try:
def names(raw_artists: List[Any]) -> List[str]:
return [a[NAME] for a in raw_artists]
album = track[ALBUM]
release_date = album[RELEASE_DATE]
release_year = release_date.split('-')[0]
max_image = max(album[IMAGES], key=lambda i: i[WIDTH])
return SongInfo(
names(track[ARTISTS]),
track[ARTISTS],
names(album[ARTISTS]),
album[NAME],
track[NAME],
max_image[URL],
release_date,
release_year,
track[DISC_NUMBER],
track[TRACK_NUMBER],
album[TOTAL_TRACKS],
track[ID],
track[EXTERNAL_IDS][ISRC],
track[IS_PLAYABLE],
track[DURATION_MS])
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw_response}')
return [track_info(t) for t in tracks_response[TRACKS]]
return list(
chain.from_iterable(
[chunk_info(raw, tracks) for raw, tracks in chunk_responses]))
def get_song_genres(rawartists: List[str], track_name: str) -> List[str]:
@ -142,7 +181,7 @@ def get_song_duration(song_id: str) -> float:
return duration
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None:
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False, pre_fetched_song_info: SongInfo = None) -> None:
""" Downloads raw song audio from Spotify """
if extra_keys is None:
@ -153,26 +192,34 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
try:
output_template = Zotify.CONFIG.get_output(mode)
info = get_song_info(track_id) if pre_fetched_song_info is None else pre_fetched_song_info
song_name = fix_filename(info.artists[0]) + ' - ' + fix_filename(info.name)
(artists, raw_artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
_track_number = str(info.track_number).zfill(2)
if 'multi_disc' in extra_keys and extra_keys['multi_disc']:
_track_number = f'{info.disc_number}{_track_number}'
for k in extra_keys:
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower())
output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
output_template = output_template.replace("{release_year}", fix_filename(release_year))
output_template = output_template.replace("{disc_number}", fix_filename(disc_number))
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", ext)
output_template = output_template \
.replace("{artist}", fix_filename(info.artists[0])) \
.replace("{artists}", fix_filename(", ".join(info.artists))) \
.replace("{album_artist}", fix_filename(info.album_artists[0])) \
.replace("{album_artists}", fix_filename(", ".join(info.album_artists))) \
.replace("{album}", fix_filename(info.album_name)) \
.replace("{song_name}", fix_filename(info.name)) \
.replace("{release_date}", fix_filename(info.release_date)) \
.replace("{release_year}", fix_filename(info.release_year)) \
.replace("{disc_number}", fix_filename(info.disc_number)) \
.replace("{track_number}", fix_filename(info.track_number)) \
.replace("{total_tracks}", fix_filename(info.total_tracks)) \
.replace("{id}", fix_filename(info.scraped_song_id)) \
.replace("{isrc}", fix_filename(info.isrc)) \
.replace("{track_id}", fix_filename(track_id)) \
.replace("{ext}", ext)
filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template)
filedir = PurePath(filename).parent
@ -182,8 +229,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = Path(filename).is_file() and Path(filename).stat().st_size
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
check_id = info.scraped_song_id in get_directory_song_ids(filedir)
check_all_time = info.scraped_song_id in get_previously_downloaded()
# a song with the same name is installed
if not check_id and check_name:
@ -205,7 +252,7 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
else:
try:
if not is_playable:
if not info.is_playable:
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
else:
@ -218,8 +265,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
else:
if track_id != scraped_song_id:
track_id = scraped_song_id
if track_id != info.scraped_song_id:
track_id = info.scraped_song_id
track = TrackId.from_base62(track_id)
stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
@ -239,20 +286,20 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
) as p_bar:
b = 0
while b < 5:
#for _ in range(int(total_size / Zotify.CONFIG.get_chunk_size()) + 2):
#for _ in range(int(total_size / Zotify.CONFIG.get_chunk_size()) + 2):
data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
b += 1 if data == b'' else 0
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
delta_want = (downloaded / total_size) * (info.duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
time_downloaded = time.time()
genres = get_song_genres(raw_artists, name)
genres = get_song_genres(info.raw_artists, info.name)
if(Zotify.CONFIG.get_download_lyrics()):
try:
@ -261,8 +308,8 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.SKIPS, f"### Skipping lyrics for {song_name}: lyrics not available ###")
convert_audio_format(filename_temp)
try:
set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename_temp, image_url)
set_audio_tags(filename_temp, info.artists, genres, info.name, info.album_name, info.release_year, info.disc_number, info.track_number)
set_music_thumbnail(filename_temp, info.image_url)
except Exception:
Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.")
@ -275,10 +322,10 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
# add song id to archive file
if Zotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name)
add_to_archive(info.scraped_song_id, PurePath(filename).name, info.artists[0], info.name)
# add song id to download directory's .song_ids file
if not check_id:
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name)
add_to_directory_song_ids(filedir, info.scraped_song_id, PurePath(filename).name, info.artists[0], info.name)
if not Zotify.CONFIG.get_bulk_wait_time():
time.sleep(Zotify.CONFIG.get_bulk_wait_time())

View File

@ -6,7 +6,7 @@ import re
import subprocess
from enum import Enum
from pathlib import Path, PurePath
from typing import List, Tuple
from typing import List, Tuple, Set
import music_tag
import requests
@ -32,15 +32,15 @@ def create_download_directory(download_path: str) -> None:
pass
def get_previously_downloaded() -> List[str]:
def get_previously_downloaded() -> Set[str]:
""" Returns list of all time downloaded songs """
ids = []
ids = {}
archive_path = Zotify.CONFIG.get_song_archive()
if Path(archive_path).exists():
with open(archive_path, 'r', encoding='utf-8') as f:
ids = [line.strip().split('\t')[0] for line in f.readlines()]
ids = {line.strip().split('\t')[0] for line in f.readlines()}
return ids