Experimental OAuth login
This commit is contained in:
parent
b361976504
commit
446c8c2a52
2
Pipfile
2
Pipfile
|
@ -8,7 +8,7 @@ librespot = {git = "git+https://github.com/kokarare1212/librespot-python"}
|
|||
music-tag = {git = "git+https://zotify.xyz/zotify/music-tag"}
|
||||
mutagen = "*"
|
||||
pillow = "*"
|
||||
pwinput = "*"
|
||||
pkce = "*"
|
||||
requests = "*"
|
||||
tqdm = "*"
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "9cf0a0fbfd691c64820035a5c12805f868ae1d2401630b9f68b67b936f5e7892"
|
||||
"sha256": "9a41882e9856db99151e4f1a3712d4b1562f2997e9a51cfcaf473335cd2db74c"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
|
@ -247,6 +247,15 @@
|
|||
"markers": "python_version >= '3.8'",
|
||||
"version": "==10.4.0"
|
||||
},
|
||||
"pkce": {
|
||||
"hashes": [
|
||||
"sha256:55927e24c7d403b2491ebe182b95d9dcb1807643243d47e3879fbda5aad4471d",
|
||||
"sha256:9775fd76d8a743d39b87df38af1cd04a58c9b5a5242d5a6350ef343d06814ab6"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3'",
|
||||
"version": "==1.0.3"
|
||||
},
|
||||
"protobuf": {
|
||||
"hashes": [
|
||||
"sha256:06059eb6953ff01e56a25cd02cca1a9649a75a7e65397b5b9b4e929ed71d10cf",
|
||||
|
@ -277,13 +286,6 @@
|
|||
"markers": "python_version >= '3.7'",
|
||||
"version": "==3.20.1"
|
||||
},
|
||||
"pwinput": {
|
||||
"hashes": [
|
||||
"sha256:ca1a8bd06e28872d751dbd4132d8637127c25b408ea3a349377314a5491426f3"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.0.3"
|
||||
},
|
||||
"pycryptodomex": {
|
||||
"hashes": [
|
||||
"sha256:0daad007b685db36d977f9de73f61f8da2a7104e20aca3effd30752fd56f73e1",
|
||||
|
|
|
@ -1,78 +1,76 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from threading import Thread
|
||||
from typing import Any
|
||||
from time import time_ns
|
||||
from urllib.parse import urlencode, urlparse, parse_qs
|
||||
|
||||
from librespot.audio import AudioKeyManager, CdnManager
|
||||
from librespot.audio.decoders import VorbisOnlyAudioQuality
|
||||
from librespot.core import ApiClient, ApResolver, PlayableContentFeeder
|
||||
from librespot.core import Session as LibrespotSession
|
||||
from librespot.audio.storage import ChannelManager
|
||||
from librespot.cache import CacheManager
|
||||
from librespot.core import (
|
||||
ApResolver,
|
||||
DealerClient,
|
||||
EventService,
|
||||
PlayableContentFeeder,
|
||||
SearchManager,
|
||||
ApiClient as LibrespotApiClient,
|
||||
Session as LibrespotSession,
|
||||
TokenProvider as LibrespotTokenProvider,
|
||||
)
|
||||
from librespot.mercury import MercuryClient
|
||||
from librespot.metadata import EpisodeId, PlayableId, TrackId
|
||||
from pwinput import pwinput
|
||||
from requests import HTTPError, get
|
||||
from librespot.proto import Authentication_pb2 as Authentication
|
||||
from pkce import generate_code_verifier, get_code_challenge
|
||||
from requests import HTTPError, get, post
|
||||
|
||||
from zotify.loader import Loader
|
||||
from zotify.playable import Episode, Track
|
||||
from zotify.utils import Quality
|
||||
|
||||
API_URL = "https://api.sp" + "otify.com/v1/"
|
||||
|
||||
|
||||
class Api(ApiClient):
|
||||
def __init__(self, session: Session):
|
||||
super(Api, self).__init__(session)
|
||||
self.__session = session
|
||||
|
||||
def __get_token(self) -> str:
|
||||
return (
|
||||
self.__session.tokens()
|
||||
.get_token(
|
||||
"playlist-read-private", # Private playlists
|
||||
"user-follow-read", # Followed artists
|
||||
"user-library-read", # Liked tracks/episodes/etc.
|
||||
"user-read-private", # Country
|
||||
)
|
||||
.access_token
|
||||
)
|
||||
|
||||
def invoke_url(
|
||||
self,
|
||||
url: str,
|
||||
params: dict = {},
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
) -> dict:
|
||||
"""
|
||||
Requests data from API
|
||||
Args:
|
||||
url: API URL and to get data from
|
||||
params: parameters to be sent in the request
|
||||
limit: The maximum number of items in the response
|
||||
offset: The offset of the items returned
|
||||
Returns:
|
||||
Dictionary representation of JSON response
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.__get_token()}",
|
||||
"Accept": "application/json",
|
||||
"Accept-Language": self.__session.language(),
|
||||
"app-platform": "WebPlayer",
|
||||
}
|
||||
params["limit"] = limit
|
||||
params["offset"] = offset
|
||||
|
||||
response = get(API_URL + url, headers=headers, params=params)
|
||||
data = response.json()
|
||||
|
||||
try:
|
||||
raise HTTPError(
|
||||
f"{url}\nAPI Error {data['error']['status']}: {data['error']['message']}"
|
||||
)
|
||||
except KeyError:
|
||||
return data
|
||||
AUTH_URL = "https://accounts.sp" + "otify.com/"
|
||||
REDIRECT_URI = "http://127.0.0.1:4381/login"
|
||||
CLIENT_ID = "65b70807" + "3fc0480e" + "a92a0772" + "33ca87bd"
|
||||
SCOPES = [
|
||||
"app-remote-control",
|
||||
"playlist-modify",
|
||||
"playlist-modify-private",
|
||||
"playlist-modify-public",
|
||||
"playlist-read",
|
||||
"playlist-read-collaborative",
|
||||
"playlist-read-private",
|
||||
"streaming",
|
||||
"ugc-image-upload",
|
||||
"user-follow-modify",
|
||||
"user-follow-read",
|
||||
"user-library-modify",
|
||||
"user-library-read",
|
||||
"user-modify",
|
||||
"user-modify-playback-state",
|
||||
"user-modify-private",
|
||||
"user-personalized",
|
||||
"user-read-birthdate",
|
||||
"user-read-currently-playing",
|
||||
"user-read-email",
|
||||
"user-read-play-history",
|
||||
"user-read-playback-position",
|
||||
"user-read-playback-state",
|
||||
"user-read-private",
|
||||
"user-read-recently-played",
|
||||
"user-top-read",
|
||||
]
|
||||
|
||||
|
||||
class Session(LibrespotSession):
|
||||
def __init__(
|
||||
self, session_builder: LibrespotSession.Builder, language: str = "en"
|
||||
self,
|
||||
session_builder: LibrespotSession.Builder,
|
||||
token: TokenProvider.StoredToken,
|
||||
language: str = "en",
|
||||
) -> None:
|
||||
"""
|
||||
Authenticates user, saves credentials to a file and generates api token.
|
||||
|
@ -91,10 +89,10 @@ class Session(LibrespotSession):
|
|||
),
|
||||
ApResolver.get_random_accesspoint(),
|
||||
)
|
||||
self.__token = token
|
||||
self.__language = language
|
||||
self.connect()
|
||||
self.authenticate(session_builder.login_credentials)
|
||||
self.__api = Api(self)
|
||||
self.__language = language
|
||||
|
||||
@staticmethod
|
||||
def from_file(cred_file: Path | str, language: str = "en") -> Session:
|
||||
|
@ -114,20 +112,16 @@ class Session(LibrespotSession):
|
|||
.build()
|
||||
)
|
||||
session = LibrespotSession.Builder(conf).stored_file(str(cred_file))
|
||||
return Session(session, language)
|
||||
token = session.login_credentials.auth_data # TODO: this is wrong
|
||||
return Session(session, token, language)
|
||||
|
||||
@staticmethod
|
||||
def from_userpass(
|
||||
username: str,
|
||||
password: str,
|
||||
save_file: Path | str | None = None,
|
||||
language: str = "en",
|
||||
def from_oauth(
|
||||
save_file: Path | str | None = None, language: str = "en"
|
||||
) -> Session:
|
||||
"""
|
||||
Creates session using username & password
|
||||
Creates a session using OAuth2
|
||||
Args:
|
||||
username: Account username
|
||||
password: Account password
|
||||
save_file: Path to save login credentials to, optional.
|
||||
language: ISO 639-1 language code for API responses
|
||||
Returns:
|
||||
|
@ -142,26 +136,19 @@ class Session(LibrespotSession):
|
|||
else:
|
||||
builder.set_store_credentials(False)
|
||||
|
||||
session = LibrespotSession.Builder(builder.build()).user_pass(
|
||||
username, password
|
||||
)
|
||||
return Session(session, language)
|
||||
|
||||
@staticmethod
|
||||
def from_prompt(
|
||||
save_file: Path | str | None = None, language: str = "en"
|
||||
) -> Session:
|
||||
"""
|
||||
Creates a session with username + password supplied from CLI prompt
|
||||
Args:
|
||||
save_file: Path to save login credentials to, optional.
|
||||
language: ISO 639-1 language code for API responses
|
||||
Returns:
|
||||
Zotify session
|
||||
"""
|
||||
# TODO: this should be done in App()
|
||||
username = input("Username: ")
|
||||
password = pwinput(prompt="Password: ", mask="*")
|
||||
return Session.from_userpass(username, password, save_file, language)
|
||||
auth = OAuth()
|
||||
print(f"Click on the following link to login:\n{auth.get_authorization_url()}")
|
||||
token = auth.await_token()
|
||||
|
||||
session = LibrespotSession.Builder(builder.build())
|
||||
session.login_credentials = Authentication.LoginCredentials(
|
||||
username=username,
|
||||
typ=Authentication.AuthenticationType.values()[3],
|
||||
auth_data=token.access_token.encode(),
|
||||
)
|
||||
return Session(session, token, language)
|
||||
|
||||
def __get_playable(
|
||||
self, playable_id: PlayableId, quality: Quality
|
||||
|
@ -201,9 +188,9 @@ class Session(LibrespotSession):
|
|||
self.api(),
|
||||
)
|
||||
|
||||
def api(self) -> Api:
|
||||
"""Returns API Client"""
|
||||
return self.__api
|
||||
def token(self) -> TokenProvider.StoredToken:
|
||||
"""Returns API token"""
|
||||
return self.__token
|
||||
|
||||
def language(self) -> str:
|
||||
"""Returns session language"""
|
||||
|
@ -212,3 +199,189 @@ class Session(LibrespotSession):
|
|||
def is_premium(self) -> bool:
|
||||
"""Returns users premium account status"""
|
||||
return self.get_user_attribute("type") == "premium"
|
||||
|
||||
def authenticate(self, credential: Authentication.LoginCredentials) -> None:
|
||||
"""
|
||||
Log in to the thing
|
||||
Args:
|
||||
credential: Account login information
|
||||
"""
|
||||
self.__authenticate_partial(credential, False)
|
||||
with self.__auth_lock:
|
||||
self.__mercury_client = MercuryClient(self)
|
||||
self.__token_provider = TokenProvider(self)
|
||||
self.__audio_key_manager = AudioKeyManager(self)
|
||||
self.__channel_manager = ChannelManager(self)
|
||||
self.__api = ApiClient(self)
|
||||
self.__cdn_manager = CdnManager(self)
|
||||
self.__content_feeder = PlayableContentFeeder(self)
|
||||
self.__cache_manager = CacheManager(self)
|
||||
self.__dealer_client = DealerClient(self)
|
||||
self.__search = SearchManager(self)
|
||||
self.__event_service = EventService(self)
|
||||
self.__auth_lock_bool = False
|
||||
self.__auth_lock.notify_all()
|
||||
self.dealer().connect()
|
||||
self.mercury().interested_in("sp" + "otify:user:attributes:update", self)
|
||||
self.dealer().add_message_listener(
|
||||
self, ["hm://connect-state/v1/connect/logout"]
|
||||
)
|
||||
|
||||
|
||||
class ApiClient(LibrespotApiClient):
|
||||
def __init__(self, session: Session):
|
||||
super(ApiClient, self).__init__(session)
|
||||
self.__session = session
|
||||
|
||||
def invoke_url(
|
||||
self,
|
||||
url: str,
|
||||
params: dict[str, Any] = {},
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Requests data from API
|
||||
Args:
|
||||
url: API URL and to get data from
|
||||
params: parameters to be sent in the request
|
||||
limit: The maximum number of items in the response
|
||||
offset: The offset of the items returned
|
||||
Returns:
|
||||
Dictionary representation of JSON response
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.__get_token()}",
|
||||
"Accept": "application/json",
|
||||
"Accept-Language": self.__session.language(),
|
||||
"app-platform": "WebPlayer",
|
||||
}
|
||||
params["limit"] = limit
|
||||
params["offset"] = offset
|
||||
|
||||
response = get(API_URL + url, headers=headers, params=params)
|
||||
data = response.json()
|
||||
|
||||
try:
|
||||
raise HTTPError(
|
||||
f"{url}\nAPI Error {data['error']['status']}: {data['error']['message']}"
|
||||
)
|
||||
except KeyError:
|
||||
return data
|
||||
|
||||
def __get_token(self) -> str:
|
||||
return (
|
||||
self.__session.tokens()
|
||||
.get_token(
|
||||
"playlist-read-private", # Private playlists
|
||||
"user-follow-read", # Followed artists
|
||||
"user-library-read", # Liked tracks/episodes/etc.
|
||||
"user-read-private", # Country
|
||||
)
|
||||
.access_token
|
||||
)
|
||||
|
||||
|
||||
class TokenProvider(LibrespotTokenProvider):
|
||||
def __init__(self, session: Session):
|
||||
super(TokenProvider, self).__init__(session)
|
||||
self._session = session
|
||||
|
||||
def get_token(self, *scopes) -> TokenProvider.StoredToken:
|
||||
return self._session.token()
|
||||
|
||||
class StoredToken(LibrespotTokenProvider.StoredToken):
|
||||
def __init__(self, obj):
|
||||
self.timestamp = int(time_ns() / 1000)
|
||||
self.expires_in = int(obj["expires_in"])
|
||||
self.access_token = obj["access_token"]
|
||||
self.scopes = obj["scope"].split()
|
||||
self.refresh_token = obj["refresh_token"]
|
||||
|
||||
|
||||
class OAuth:
|
||||
__code_verifier: str
|
||||
__server_thread: Thread
|
||||
__token: TokenProvider.StoredToken
|
||||
|
||||
def __init__(self):
|
||||
self.__server_thread = Thread(target=self.__run_server)
|
||||
self.__server_thread.start()
|
||||
|
||||
def get_authorization_url(self) -> str:
|
||||
self.__code_verifier = generate_code_verifier()
|
||||
code_challenge = get_code_challenge(self.__code_verifier)
|
||||
params = {
|
||||
"client_id": CLIENT_ID,
|
||||
"response_type": "code",
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scope": ",".join(SCOPES),
|
||||
"code_challenge_method": "S256",
|
||||
"code_challenge": code_challenge,
|
||||
}
|
||||
return f"{AUTH_URL}authorize?{urlencode(params)}"
|
||||
|
||||
def await_token(self) -> TokenProvider.StoredToken:
|
||||
self.__server_thread.join()
|
||||
return self.__token
|
||||
|
||||
def set_token(self, code: str) -> None:
|
||||
token_url = f"{AUTH_URL}api/token"
|
||||
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||
body = {
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"client_id": CLIENT_ID,
|
||||
"code_verifier": self.__code_verifier,
|
||||
}
|
||||
response = post(token_url, headers=headers, data=body)
|
||||
if response.status_code != 200:
|
||||
raise IOError(
|
||||
f"Error fetching token: {response.status_code}, {response.text}"
|
||||
)
|
||||
self.__token = TokenProvider.StoredToken(response.json())
|
||||
|
||||
def __run_server(self) -> None:
|
||||
server_address = ("127.0.0.1", 4381)
|
||||
httpd = self.OAuthHTTPServer(server_address, self.RequestHandler, self)
|
||||
httpd.authenticator = self
|
||||
httpd.serve_forever()
|
||||
|
||||
class OAuthHTTPServer(HTTPServer):
|
||||
authenticator: OAuth
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_address: tuple[str, int],
|
||||
RequestHandlerClass: type[BaseHTTPRequestHandler],
|
||||
authenticator: OAuth,
|
||||
):
|
||||
super().__init__(server_address, RequestHandlerClass)
|
||||
self.authenticator = authenticator
|
||||
|
||||
class RequestHandler(BaseHTTPRequestHandler):
|
||||
def log_message(self, format: str, *args):
|
||||
return
|
||||
|
||||
def do_GET(self) -> None:
|
||||
parsed_path = urlparse(self.path)
|
||||
query_params = parse_qs(parsed_path.query)
|
||||
code = query_params.get("code")
|
||||
|
||||
if code:
|
||||
if isinstance(self.server, OAuth.OAuthHTTPServer):
|
||||
self.server.authenticator.set_token(code[0])
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
self.wfile.write(
|
||||
b"Authorization successful. You can close this window."
|
||||
)
|
||||
Thread(target=self.server.shutdown).start()
|
||||
else:
|
||||
self.send_response(400)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"Authorization code not found.")
|
||||
Thread(target=self.server.shutdown).start()
|
||||
|
|
|
@ -147,21 +147,24 @@ class App:
|
|||
Logger(self.__config)
|
||||
|
||||
# Create session
|
||||
if args.username != "" and args.password != "":
|
||||
self.__session = Session.from_userpass(
|
||||
args.username,
|
||||
args.password,
|
||||
self.__config.credentials_path,
|
||||
self.__config.language,
|
||||
)
|
||||
elif self.__config.credentials_path.is_file():
|
||||
self.__session = Session.from_file(
|
||||
self.__config.credentials_path, self.__config.language
|
||||
)
|
||||
else:
|
||||
self.__session = Session.from_prompt(
|
||||
self.__config.credentials_path, self.__config.language
|
||||
)
|
||||
# if args.username != "" and args.password != "":
|
||||
# self.__session = Session.from_userpass(
|
||||
# args.username,
|
||||
# args.password,
|
||||
# self.__config.credentials_path,
|
||||
# self.__config.language,
|
||||
# )
|
||||
# elif self.__config.credentials_path.is_file():
|
||||
# self.__session = Session.from_file(
|
||||
# self.__config.credentials_path, self.__config.language
|
||||
# )
|
||||
# else:
|
||||
# self.__session = Session.from_prompt(
|
||||
# self.__config.credentials_path, self.__config.language
|
||||
# )
|
||||
self.__session = Session.from_oauth(
|
||||
self.__config.credentials_path, self.__config.language
|
||||
)
|
||||
|
||||
# Get items to download
|
||||
ids = self.get_selection(args)
|
||||
|
@ -268,6 +271,7 @@ class App:
|
|||
LogChannel.SKIPS,
|
||||
f'Skipping "{track.name}": Already exists at specified output',
|
||||
)
|
||||
continue
|
||||
|
||||
# Download track
|
||||
with Logger.progress(
|
||||
|
|
|
@ -5,7 +5,7 @@ from librespot.metadata import (
|
|||
ShowId,
|
||||
)
|
||||
|
||||
from zotify import Api
|
||||
from zotify import ApiClient
|
||||
from zotify.config import Config
|
||||
from zotify.utils import MetadataEntry, PlayableData, PlayableType, bytes_to_base62
|
||||
|
||||
|
@ -13,12 +13,12 @@ from zotify.utils import MetadataEntry, PlayableData, PlayableType, bytes_to_bas
|
|||
class Collection:
|
||||
playables: list[PlayableData] = []
|
||||
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Album(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
album = api.get_metadata_4_album(AlbumId.from_base62(b62_id))
|
||||
for disc in album.disc:
|
||||
for track in disc.track:
|
||||
|
@ -33,7 +33,7 @@ class Album(Collection):
|
|||
|
||||
|
||||
class Artist(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
artist = api.get_metadata_4_artist(ArtistId.from_base62(b62_id))
|
||||
for album_group in (
|
||||
artist.album_group
|
||||
|
@ -55,7 +55,7 @@ class Artist(Collection):
|
|||
|
||||
|
||||
class Show(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
show = api.get_metadata_4_show(ShowId.from_base62(b62_id))
|
||||
for episode in show.episode:
|
||||
self.playables.append(
|
||||
|
@ -69,7 +69,7 @@ class Show(Collection):
|
|||
|
||||
|
||||
class Playlist(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
playlist = api.get_playlist(PlaylistId(b62_id))
|
||||
for i in range(len(playlist.contents.items)):
|
||||
item = playlist.contents.items[i]
|
||||
|
@ -111,7 +111,7 @@ class Playlist(Collection):
|
|||
|
||||
|
||||
class Track(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
self.playables.append(
|
||||
PlayableData(
|
||||
PlayableType.TRACK, b62_id, config.album_library, config.output_album
|
||||
|
@ -120,7 +120,7 @@ class Track(Collection):
|
|||
|
||||
|
||||
class Episode(Collection):
|
||||
def __init__(self, b62_id: str, api: Api, config: Config = Config()):
|
||||
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
|
||||
self.playables.append(
|
||||
PlayableData(
|
||||
PlayableType.EPISODE,
|
||||
|
|
|
@ -55,7 +55,9 @@ class LocalFile:
|
|||
"-i",
|
||||
str(self.__path),
|
||||
]
|
||||
path = self.__path.parent.joinpath(self.__path.name.rsplit(".", 1)[0] + ext)
|
||||
path = self.__path.parent.joinpath(
|
||||
self.__path.name.rsplit(".", 1)[0] + "." + ext
|
||||
)
|
||||
if self.__path == path:
|
||||
raise TranscodingError(
|
||||
f"Cannot overwrite source, target file {path} already exists."
|
||||
|
@ -97,7 +99,7 @@ class LocalFile:
|
|||
try:
|
||||
f[m.name] = m.value
|
||||
except KeyError:
|
||||
pass
|
||||
pass # TODO
|
||||
try:
|
||||
f.save()
|
||||
except OggVorbisHeaderError:
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
from math import floor
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from librespot.core import PlayableContentFeeder
|
||||
from librespot.metadata import AlbumId
|
||||
from librespot.proto import Metadata_pb2 as Metadata
|
||||
from librespot.structure import GeneralAudioStream
|
||||
from librespot.util import bytes_to_hex
|
||||
from requests import get
|
||||
|
@ -57,7 +57,7 @@ class Lyrics:
|
|||
|
||||
|
||||
class Playable:
|
||||
cover_images: list[Any]
|
||||
cover_images: list[Metadata.Image]
|
||||
input_stream: GeneralAudioStream
|
||||
metadata: list[MetadataEntry]
|
||||
name: str
|
||||
|
@ -165,6 +165,7 @@ class Track(PlayableContentFeeder.LoadedStream, Playable):
|
|||
MetadataEntry("popularity", int(self.popularity * 255) / 100),
|
||||
MetadataEntry("track_number", self.number, str(self.number).zfill(2)),
|
||||
MetadataEntry("title", self.name),
|
||||
MetadataEntry("track", self.name),
|
||||
MetadataEntry("year", date.year),
|
||||
MetadataEntry(
|
||||
"replaygain_track_gain", self.normalization_data.track_gain_db, ""
|
||||
|
|
Loading…
Reference in New Issue