[ie/ximalaya] Add VIP support (#10832)

Closes #6928
Authored by: xingchensong, seproDev

Co-authored-by: sepro <4618135+seproDev@users.noreply.github.com>
This commit is contained in:
Xingchen Song(宋星辰) 2024-09-14 07:16:34 +08:00 committed by GitHub
parent 25c1cdaa26
commit 3dfd720d09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 85 additions and 6 deletions

View File

@ -1,7 +1,17 @@
import base64
import math import math
import time
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import InAdvancePagedList, str_or_none, traverse_obj, try_call from .videa import VideaIE
from ..utils import (
InAdvancePagedList,
int_or_none,
str_or_none,
traverse_obj,
try_call,
update_url_query,
)
class XimalayaBaseIE(InfoExtractor): class XimalayaBaseIE(InfoExtractor):
@ -71,23 +81,92 @@ class XimalayaIE(XimalayaBaseIE):
'like_count': int, 'like_count': int,
}, },
}, },
{
# VIP-restricted audio
'url': 'https://www.ximalaya.com/sound/562111701',
'only_matching': True,
},
] ]
@staticmethod
def _decrypt_filename(file_id, seed):
cgstr = ''
key = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ/\\:._-1234567890'
for _ in key:
seed = float(int(211 * seed + 30031) % 65536)
r = int(seed / 65536 * len(key))
cgstr += key[r]
key = key.replace(key[r], '')
parts = file_id.split('*')
filename = ''.join(cgstr[int(part)] for part in parts if part.isdecimal())
if not filename.startswith('/'):
filename = '/' + filename
return filename
@staticmethod
def _decrypt_url_params(encrypted_params):
params = VideaIE.rc4(
base64.b64decode(encrypted_params), 'xkt3a41psizxrh9l').split('-')
# sign, token, timestamp
return params[1], params[2], params[3]
def _real_extract(self, url): def _real_extract(self, url):
scheme = 'https' if url.startswith('https') else 'http' scheme = 'https' if url.startswith('https') else 'http'
audio_id = self._match_id(url) audio_id = self._match_id(url)
audio_info_file = f'{scheme}://m.ximalaya.com/tracks/{audio_id}.json'
audio_info = self._download_json( audio_info = self._download_json(
audio_info_file, audio_id, f'{scheme}://m.ximalaya.com/tracks/{audio_id}.json', audio_id,
f'Downloading info json {audio_info_file}', 'Unable to download info file') 'Downloading info json', 'Unable to download info file')
formats = [{ formats = []
# NOTE: VIP-restricted audio
if audio_info.get('is_paid'):
ts = int(time.time())
vip_info = self._download_json(
f'{scheme}://mpay.ximalaya.com/mobile/track/pay/{audio_id}/{ts}',
audio_id, 'Downloading VIP info json', 'Unable to download VIP info file',
query={'device': 'pc', 'isBackend': 'true', '_': ts})
filename = self._decrypt_filename(vip_info['fileId'], vip_info['seed'])
sign, token, timestamp = self._decrypt_url_params(vip_info['ep'])
vip_url = update_url_query(
f'{vip_info["domain"]}/download/{vip_info["apiVersion"]}{filename}', {
'sign': sign,
'token': token,
'timestamp': timestamp,
'buy_key': vip_info['buyKey'],
'duration': vip_info['duration'],
})
fmt = {
'format_id': 'vip',
'url': vip_url,
'vcodec': 'none',
}
if '_preview_' in vip_url:
self.report_warning(
f'This tracks requires a VIP account. Using a sample instead. {self._login_hint()}')
fmt.update({
'format_note': 'Sample',
'preference': -10,
**traverse_obj(vip_info, {
'filesize': ('sampleLength', {int_or_none}),
'duration': ('sampleDuration', {int_or_none}),
}),
})
else:
fmt.update(traverse_obj(vip_info, {
'filesize': ('totalLength', {int_or_none}),
'duration': ('duration', {int_or_none}),
}))
fmt['abr'] = try_call(lambda: fmt['filesize'] * 8 / fmt['duration'] / 1024)
formats.append(fmt)
formats.extend([{
'format_id': f'{bps}k', 'format_id': f'{bps}k',
'url': audio_info[k], 'url': audio_info[k],
'abr': bps, 'abr': bps,
'vcodec': 'none', 'vcodec': 'none',
} for bps, k in ((24, 'play_path_32'), (64, 'play_path_64')) if audio_info.get(k)] } for bps, k in ((24, 'play_path_32'), (64, 'play_path_64')) if audio_info.get(k)])
thumbnails = [] thumbnails = []
for k in audio_info: for k in audio_info: