Added PyDub

This commit is contained in:
Mark Qvist 2024-06-04 03:19:24 +02:00
parent 446181aa58
commit 53479d4700
11 changed files with 3344 additions and 0 deletions

1
sbapp/pydub/__init__.py Normal file
View File

@ -0,0 +1 @@
from .audio_segment import AudioSegment

1399
sbapp/pydub/audio_segment.py Normal file

File diff suppressed because it is too large Load Diff

341
sbapp/pydub/effects.py Normal file
View File

@ -0,0 +1,341 @@
import sys
import math
import array
from .utils import (
db_to_float,
ratio_to_db,
register_pydub_effect,
make_chunks,
audioop,
get_min_max_value
)
from .silence import split_on_silence
from .exceptions import TooManyMissingFrames, InvalidDuration
if sys.version_info >= (3, 0):
xrange = range
@register_pydub_effect
def apply_mono_filter_to_each_channel(seg, filter_fn):
n_channels = seg.channels
channel_segs = seg.split_to_mono()
channel_segs = [filter_fn(channel_seg) for channel_seg in channel_segs]
out_data = seg.get_array_of_samples()
for channel_i, channel_seg in enumerate(channel_segs):
for sample_i, sample in enumerate(channel_seg.get_array_of_samples()):
index = (sample_i * n_channels) + channel_i
out_data[index] = sample
return seg._spawn(out_data)
@register_pydub_effect
def normalize(seg, headroom=0.1):
"""
headroom is how close to the maximum volume to boost the signal up to (specified in dB)
"""
peak_sample_val = seg.max
# if the max is 0, this audio segment is silent, and can't be normalized
if peak_sample_val == 0:
return seg
target_peak = seg.max_possible_amplitude * db_to_float(-headroom)
needed_boost = ratio_to_db(target_peak / peak_sample_val)
return seg.apply_gain(needed_boost)
@register_pydub_effect
def speedup(seg, playback_speed=1.5, chunk_size=150, crossfade=25):
# we will keep audio in 150ms chunks since one waveform at 20Hz is 50ms long
# (20 Hz is the lowest frequency audible to humans)
# portion of AUDIO TO KEEP. if playback speed is 1.25 we keep 80% (0.8) and
# discard 20% (0.2)
atk = 1.0 / playback_speed
if playback_speed < 2.0:
# throwing out more than half the audio - keep 50ms chunks
ms_to_remove_per_chunk = int(chunk_size * (1 - atk) / atk)
else:
# throwing out less than half the audio - throw out 50ms chunks
ms_to_remove_per_chunk = int(chunk_size)
chunk_size = int(atk * chunk_size / (1 - atk))
# the crossfade cannot be longer than the amount of audio we're removing
crossfade = min(crossfade, ms_to_remove_per_chunk - 1)
# DEBUG
#print("chunk: {0}, rm: {1}".format(chunk_size, ms_to_remove_per_chunk))
chunks = make_chunks(seg, chunk_size + ms_to_remove_per_chunk)
if len(chunks) < 2:
raise Exception("Could not speed up AudioSegment, it was too short {2:0.2f}s for the current settings:\n{0}ms chunks at {1:0.1f}x speedup".format(
chunk_size, playback_speed, seg.duration_seconds))
# we'll actually truncate a bit less than we calculated to make up for the
# crossfade between chunks
ms_to_remove_per_chunk -= crossfade
# we don't want to truncate the last chunk since it is not guaranteed to be
# the full chunk length
last_chunk = chunks[-1]
chunks = [chunk[:-ms_to_remove_per_chunk] for chunk in chunks[:-1]]
out = chunks[0]
for chunk in chunks[1:]:
out = out.append(chunk, crossfade=crossfade)
out += last_chunk
return out
@register_pydub_effect
def strip_silence(seg, silence_len=1000, silence_thresh=-16, padding=100):
if padding > silence_len:
raise InvalidDuration("padding cannot be longer than silence_len")
chunks = split_on_silence(seg, silence_len, silence_thresh, padding)
crossfade = padding / 2
if not len(chunks):
return seg[0:0]
seg = chunks[0]
for chunk in chunks[1:]:
seg = seg.append(chunk, crossfade=crossfade)
return seg
@register_pydub_effect
def compress_dynamic_range(seg, threshold=-20.0, ratio=4.0, attack=5.0, release=50.0):
"""
Keyword Arguments:
threshold - default: -20.0
Threshold in dBFS. default of -20.0 means -20dB relative to the
maximum possible volume. 0dBFS is the maximum possible value so
all values for this argument sould be negative.
ratio - default: 4.0
Compression ratio. Audio louder than the threshold will be
reduced to 1/ratio the volume. A ratio of 4.0 is equivalent to
a setting of 4:1 in a pro-audio compressor like the Waves C1.
attack - default: 5.0
Attack in milliseconds. How long it should take for the compressor
to kick in once the audio has exceeded the threshold.
release - default: 50.0
Release in milliseconds. How long it should take for the compressor
to stop compressing after the audio has falled below the threshold.
For an overview of Dynamic Range Compression, and more detailed explanation
of the related terminology, see:
http://en.wikipedia.org/wiki/Dynamic_range_compression
"""
thresh_rms = seg.max_possible_amplitude * db_to_float(threshold)
look_frames = int(seg.frame_count(ms=attack))
def rms_at(frame_i):
return seg.get_sample_slice(frame_i - look_frames, frame_i).rms
def db_over_threshold(rms):
if rms == 0: return 0.0
db = ratio_to_db(rms / thresh_rms)
return max(db, 0)
output = []
# amount to reduce the volume of the audio by (in dB)
attenuation = 0.0
attack_frames = seg.frame_count(ms=attack)
release_frames = seg.frame_count(ms=release)
for i in xrange(int(seg.frame_count())):
rms_now = rms_at(i)
# with a ratio of 4.0 this means the volume will exceed the threshold by
# 1/4 the amount (of dB) that it would otherwise
max_attenuation = (1 - (1.0 / ratio)) * db_over_threshold(rms_now)
attenuation_inc = max_attenuation / attack_frames
attenuation_dec = max_attenuation / release_frames
if rms_now > thresh_rms and attenuation <= max_attenuation:
attenuation += attenuation_inc
attenuation = min(attenuation, max_attenuation)
else:
attenuation -= attenuation_dec
attenuation = max(attenuation, 0)
frame = seg.get_frame(i)
if attenuation != 0.0:
frame = audioop.mul(frame,
seg.sample_width,
db_to_float(-attenuation))
output.append(frame)
return seg._spawn(data=b''.join(output))
# Invert the phase of the signal.
@register_pydub_effect
def invert_phase(seg, channels=(1, 1)):
"""
channels- specifies which channel (left or right) to reverse the phase of.
Note that mono AudioSegments will become stereo.
"""
if channels == (1, 1):
inverted = audioop.mul(seg._data, seg.sample_width, -1.0)
return seg._spawn(data=inverted)
else:
if seg.channels == 2:
left, right = seg.split_to_mono()
else:
raise Exception("Can't implicitly convert an AudioSegment with " + str(seg.channels) + " channels to stereo.")
if channels == (1, 0):
left = left.invert_phase()
else:
right = right.invert_phase()
return seg.from_mono_audiosegments(left, right)
# High and low pass filters based on implementation found on Stack Overflow:
# http://stackoverflow.com/questions/13882038/implementing-simple-high-and-low-pass-filters-in-c
@register_pydub_effect
def low_pass_filter(seg, cutoff):
"""
cutoff - Frequency (in Hz) where higher frequency signal will begin to
be reduced by 6dB per octave (doubling in frequency) above this point
"""
RC = 1.0 / (cutoff * 2 * math.pi)
dt = 1.0 / seg.frame_rate
alpha = dt / (RC + dt)
original = seg.get_array_of_samples()
filteredArray = array.array(seg.array_type, original)
frame_count = int(seg.frame_count())
last_val = [0] * seg.channels
for i in range(seg.channels):
last_val[i] = filteredArray[i] = original[i]
for i in range(1, frame_count):
for j in range(seg.channels):
offset = (i * seg.channels) + j
last_val[j] = last_val[j] + (alpha * (original[offset] - last_val[j]))
filteredArray[offset] = int(last_val[j])
return seg._spawn(data=filteredArray)
@register_pydub_effect
def high_pass_filter(seg, cutoff):
"""
cutoff - Frequency (in Hz) where lower frequency signal will begin to
be reduced by 6dB per octave (doubling in frequency) below this point
"""
RC = 1.0 / (cutoff * 2 * math.pi)
dt = 1.0 / seg.frame_rate
alpha = RC / (RC + dt)
minval, maxval = get_min_max_value(seg.sample_width * 8)
original = seg.get_array_of_samples()
filteredArray = array.array(seg.array_type, original)
frame_count = int(seg.frame_count())
last_val = [0] * seg.channels
for i in range(seg.channels):
last_val[i] = filteredArray[i] = original[i]
for i in range(1, frame_count):
for j in range(seg.channels):
offset = (i * seg.channels) + j
offset_minus_1 = ((i-1) * seg.channels) + j
last_val[j] = alpha * (last_val[j] + original[offset] - original[offset_minus_1])
filteredArray[offset] = int(min(max(last_val[j], minval), maxval))
return seg._spawn(data=filteredArray)
@register_pydub_effect
def pan(seg, pan_amount):
"""
pan_amount should be between -1.0 (100% left) and +1.0 (100% right)
When pan_amount == 0.0 the left/right balance is not changed.
Panning does not alter the *perceived* loundness, but since loudness
is decreasing on one side, the other side needs to get louder to
compensate. When panned hard left, the left channel will be 3dB louder.
"""
if not -1.0 <= pan_amount <= 1.0:
raise ValueError("pan_amount should be between -1.0 (100% left) and +1.0 (100% right)")
max_boost_db = ratio_to_db(2.0)
boost_db = abs(pan_amount) * max_boost_db
boost_factor = db_to_float(boost_db)
reduce_factor = db_to_float(max_boost_db) - boost_factor
reduce_db = ratio_to_db(reduce_factor)
# Cut boost in half (max boost== 3dB) - in reality 2 speakers
# do not sum to a full 6 dB.
boost_db = boost_db / 2.0
if pan_amount < 0:
return seg.apply_gain_stereo(boost_db, reduce_db)
else:
return seg.apply_gain_stereo(reduce_db, boost_db)
@register_pydub_effect
def apply_gain_stereo(seg, left_gain=0.0, right_gain=0.0):
"""
left_gain - amount of gain to apply to the left channel (in dB)
right_gain - amount of gain to apply to the right channel (in dB)
note: mono audio segments will be converted to stereo
"""
if seg.channels == 1:
left = right = seg
elif seg.channels == 2:
left, right = seg.split_to_mono()
l_mult_factor = db_to_float(left_gain)
r_mult_factor = db_to_float(right_gain)
left_data = audioop.mul(left._data, left.sample_width, l_mult_factor)
left_data = audioop.tostereo(left_data, left.sample_width, 1, 0)
right_data = audioop.mul(right._data, right.sample_width, r_mult_factor)
right_data = audioop.tostereo(right_data, right.sample_width, 0, 1)
output = audioop.add(left_data, right_data, seg.sample_width)
return seg._spawn(data=output,
overrides={'channels': 2,
'frame_width': 2 * seg.sample_width})

32
sbapp/pydub/exceptions.py Normal file
View File

@ -0,0 +1,32 @@
class PydubException(Exception):
"""
Base class for any Pydub exception
"""
class TooManyMissingFrames(PydubException):
pass
class InvalidDuration(PydubException):
pass
class InvalidTag(PydubException):
pass
class InvalidID3TagVersion(PydubException):
pass
class CouldntDecodeError(PydubException):
pass
class CouldntEncodeError(PydubException):
pass
class MissingAudioParameter(PydubException):
pass

142
sbapp/pydub/generators.py Normal file
View File

@ -0,0 +1,142 @@
"""
Each generator will return float samples from -1.0 to 1.0, which can be
converted to actual audio with 8, 16, 24, or 32 bit depth using the
SiganlGenerator.to_audio_segment() method (on any of it's subclasses).
See Wikipedia's "waveform" page for info on some of the generators included
here: http://en.wikipedia.org/wiki/Waveform
"""
import math
import array
import itertools
import random
from .audio_segment import AudioSegment
from .utils import (
db_to_float,
get_frame_width,
get_array_type,
get_min_max_value
)
class SignalGenerator(object):
def __init__(self, sample_rate=44100, bit_depth=16):
self.sample_rate = sample_rate
self.bit_depth = bit_depth
def to_audio_segment(self, duration=1000.0, volume=0.0):
"""
Duration in milliseconds
(default: 1 second)
Volume in DB relative to maximum amplitude
(default 0.0 dBFS, which is the maximum value)
"""
minval, maxval = get_min_max_value(self.bit_depth)
sample_width = get_frame_width(self.bit_depth)
array_type = get_array_type(self.bit_depth)
gain = db_to_float(volume)
sample_count = int(self.sample_rate * (duration / 1000.0))
sample_data = (int(val * maxval * gain) for val in self.generate())
sample_data = itertools.islice(sample_data, 0, sample_count)
data = array.array(array_type, sample_data)
try:
data = data.tobytes()
except:
data = data.tostring()
return AudioSegment(data=data, metadata={
"channels": 1,
"sample_width": sample_width,
"frame_rate": self.sample_rate,
"frame_width": sample_width,
})
def generate(self):
raise NotImplementedError("SignalGenerator subclasses must implement the generate() method, and *should not* call the superclass implementation.")
class Sine(SignalGenerator):
def __init__(self, freq, **kwargs):
super(Sine, self).__init__(**kwargs)
self.freq = freq
def generate(self):
sine_of = (self.freq * 2 * math.pi) / self.sample_rate
sample_n = 0
while True:
yield math.sin(sine_of * sample_n)
sample_n += 1
class Pulse(SignalGenerator):
def __init__(self, freq, duty_cycle=0.5, **kwargs):
super(Pulse, self).__init__(**kwargs)
self.freq = freq
self.duty_cycle = duty_cycle
def generate(self):
sample_n = 0
# in samples
cycle_length = self.sample_rate / float(self.freq)
pulse_length = cycle_length * self.duty_cycle
while True:
if (sample_n % cycle_length) < pulse_length:
yield 1.0
else:
yield -1.0
sample_n += 1
class Square(Pulse):
def __init__(self, freq, **kwargs):
kwargs['duty_cycle'] = 0.5
super(Square, self).__init__(freq, **kwargs)
class Sawtooth(SignalGenerator):
def __init__(self, freq, duty_cycle=1.0, **kwargs):
super(Sawtooth, self).__init__(**kwargs)
self.freq = freq
self.duty_cycle = duty_cycle
def generate(self):
sample_n = 0
# in samples
cycle_length = self.sample_rate / float(self.freq)
midpoint = cycle_length * self.duty_cycle
ascend_length = midpoint
descend_length = cycle_length - ascend_length
while True:
cycle_position = sample_n % cycle_length
if cycle_position < midpoint:
yield (2 * cycle_position / ascend_length) - 1.0
else:
yield 1.0 - (2 * (cycle_position - midpoint) / descend_length)
sample_n += 1
class Triangle(Sawtooth):
def __init__(self, freq, **kwargs):
kwargs['duty_cycle'] = 0.5
super(Triangle, self).__init__(freq, **kwargs)
class WhiteNoise(SignalGenerator):
def generate(self):
while True:
yield (random.random() * 2) - 1.0

View File

@ -0,0 +1,14 @@
"""
"""
import logging
converter_logger = logging.getLogger("pydub.converter")
def log_conversion(conversion_command):
converter_logger.debug("subprocess.call(%s)", repr(conversion_command))
def log_subprocess_output(output):
if output:
for line in output.rstrip().splitlines():
converter_logger.debug('subprocess output: %s', line.rstrip())

71
sbapp/pydub/playback.py Normal file
View File

@ -0,0 +1,71 @@
"""
Support for playing AudioSegments. Pyaudio will be used if it's installed,
otherwise will fallback to ffplay. Pyaudio is a *much* nicer solution, but
is tricky to install. See my notes on installing pyaudio in a virtualenv (on
OSX 10.10): https://gist.github.com/jiaaro/9767512210a1d80a8a0d
"""
import subprocess
from tempfile import NamedTemporaryFile
from .utils import get_player_name, make_chunks
def _play_with_ffplay(seg):
PLAYER = get_player_name()
with NamedTemporaryFile("w+b", suffix=".wav") as f:
seg.export(f.name, "wav")
subprocess.call([PLAYER, "-nodisp", "-autoexit", "-hide_banner", f.name])
def _play_with_pyaudio(seg):
import pyaudio
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(seg.sample_width),
channels=seg.channels,
rate=seg.frame_rate,
output=True)
# Just in case there were any exceptions/interrupts, we release the resource
# So as not to raise OSError: Device Unavailable should play() be used again
try:
# break audio into half-second chunks (to allows keyboard interrupts)
for chunk in make_chunks(seg, 500):
stream.write(chunk._data)
finally:
stream.stop_stream()
stream.close()
p.terminate()
def _play_with_simpleaudio(seg):
import simpleaudio
return simpleaudio.play_buffer(
seg.raw_data,
num_channels=seg.channels,
bytes_per_sample=seg.sample_width,
sample_rate=seg.frame_rate
)
def play(audio_segment):
try:
playback = _play_with_simpleaudio(audio_segment)
try:
playback.wait_done()
except KeyboardInterrupt:
playback.stop()
except ImportError:
pass
else:
return
try:
_play_with_pyaudio(audio_segment)
return
except ImportError:
pass
else:
return
_play_with_ffplay(audio_segment)

553
sbapp/pydub/pyaudioop.py Normal file
View File

@ -0,0 +1,553 @@
try:
from __builtin__ import max as builtin_max
from __builtin__ import min as builtin_min
except ImportError:
from builtins import max as builtin_max
from builtins import min as builtin_min
import math
import struct
try:
from fractions import gcd
except ImportError: # Python 3.9+
from math import gcd
from ctypes import create_string_buffer
class error(Exception):
pass
def _check_size(size):
if size != 1 and size != 2 and size != 4:
raise error("Size should be 1, 2 or 4")
def _check_params(length, size):
_check_size(size)
if length % size != 0:
raise error("not a whole number of frames")
def _sample_count(cp, size):
return len(cp) / size
def _get_samples(cp, size, signed=True):
for i in range(_sample_count(cp, size)):
yield _get_sample(cp, size, i, signed)
def _struct_format(size, signed):
if size == 1:
return "b" if signed else "B"
elif size == 2:
return "h" if signed else "H"
elif size == 4:
return "i" if signed else "I"
def _get_sample(cp, size, i, signed=True):
fmt = _struct_format(size, signed)
start = i * size
end = start + size
return struct.unpack_from(fmt, buffer(cp)[start:end])[0]
def _put_sample(cp, size, i, val, signed=True):
fmt = _struct_format(size, signed)
struct.pack_into(fmt, cp, i * size, val)
def _get_maxval(size, signed=True):
if signed and size == 1:
return 0x7f
elif size == 1:
return 0xff
elif signed and size == 2:
return 0x7fff
elif size == 2:
return 0xffff
elif signed and size == 4:
return 0x7fffffff
elif size == 4:
return 0xffffffff
def _get_minval(size, signed=True):
if not signed:
return 0
elif size == 1:
return -0x80
elif size == 2:
return -0x8000
elif size == 4:
return -0x80000000
def _get_clipfn(size, signed=True):
maxval = _get_maxval(size, signed)
minval = _get_minval(size, signed)
return lambda val: builtin_max(min(val, maxval), minval)
def _overflow(val, size, signed=True):
minval = _get_minval(size, signed)
maxval = _get_maxval(size, signed)
if minval <= val <= maxval:
return val
bits = size * 8
if signed:
offset = 2**(bits-1)
return ((val + offset) % (2**bits)) - offset
else:
return val % (2**bits)
def getsample(cp, size, i):
_check_params(len(cp), size)
if not (0 <= i < len(cp) / size):
raise error("Index out of range")
return _get_sample(cp, size, i)
def max(cp, size):
_check_params(len(cp), size)
if len(cp) == 0:
return 0
return builtin_max(abs(sample) for sample in _get_samples(cp, size))
def minmax(cp, size):
_check_params(len(cp), size)
max_sample, min_sample = 0, 0
for sample in _get_samples(cp, size):
max_sample = builtin_max(sample, max_sample)
min_sample = builtin_min(sample, min_sample)
return min_sample, max_sample
def avg(cp, size):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
if sample_count == 0:
return 0
return sum(_get_samples(cp, size)) / sample_count
def rms(cp, size):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
if sample_count == 0:
return 0
sum_squares = sum(sample**2 for sample in _get_samples(cp, size))
return int(math.sqrt(sum_squares / sample_count))
def _sum2(cp1, cp2, length):
size = 2
total = 0
for i in range(length):
total += getsample(cp1, size, i) * getsample(cp2, size, i)
return total
def findfit(cp1, cp2):
size = 2
if len(cp1) % 2 != 0 or len(cp2) % 2 != 0:
raise error("Strings should be even-sized")
if len(cp1) < len(cp2):
raise error("First sample should be longer")
len1 = _sample_count(cp1, size)
len2 = _sample_count(cp2, size)
sum_ri_2 = _sum2(cp2, cp2, len2)
sum_aij_2 = _sum2(cp1, cp1, len2)
sum_aij_ri = _sum2(cp1, cp2, len2)
result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2
best_result = result
best_i = 0
for i in range(1, len1 - len2 + 1):
aj_m1 = _get_sample(cp1, size, i - 1)
aj_lm1 = _get_sample(cp1, size, i + len2 - 1)
sum_aij_2 += aj_lm1**2 - aj_m1**2
sum_aij_ri = _sum2(buffer(cp1)[i*size:], cp2, len2)
result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2
if result < best_result:
best_result = result
best_i = i
factor = _sum2(buffer(cp1)[best_i*size:], cp2, len2) / sum_ri_2
return best_i, factor
def findfactor(cp1, cp2):
size = 2
if len(cp1) % 2 != 0:
raise error("Strings should be even-sized")
if len(cp1) != len(cp2):
raise error("Samples should be same size")
sample_count = _sample_count(cp1, size)
sum_ri_2 = _sum2(cp2, cp2, sample_count)
sum_aij_ri = _sum2(cp1, cp2, sample_count)
return sum_aij_ri / sum_ri_2
def findmax(cp, len2):
size = 2
sample_count = _sample_count(cp, size)
if len(cp) % 2 != 0:
raise error("Strings should be even-sized")
if len2 < 0 or sample_count < len2:
raise error("Input sample should be longer")
if sample_count == 0:
return 0
result = _sum2(cp, cp, len2)
best_result = result
best_i = 0
for i in range(1, sample_count - len2 + 1):
sample_leaving_window = getsample(cp, size, i - 1)
sample_entering_window = getsample(cp, size, i + len2 - 1)
result -= sample_leaving_window**2
result += sample_entering_window**2
if result > best_result:
best_result = result
best_i = i
return best_i
def avgpp(cp, size):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
prevextremevalid = False
prevextreme = None
avg = 0
nextreme = 0
prevval = getsample(cp, size, 0)
val = getsample(cp, size, 1)
prevdiff = val - prevval
for i in range(1, sample_count):
val = getsample(cp, size, i)
diff = val - prevval
if diff * prevdiff < 0:
if prevextremevalid:
avg += abs(prevval - prevextreme)
nextreme += 1
prevextremevalid = True
prevextreme = prevval
prevval = val
if diff != 0:
prevdiff = diff
if nextreme == 0:
return 0
return avg / nextreme
def maxpp(cp, size):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
prevextremevalid = False
prevextreme = None
max = 0
prevval = getsample(cp, size, 0)
val = getsample(cp, size, 1)
prevdiff = val - prevval
for i in range(1, sample_count):
val = getsample(cp, size, i)
diff = val - prevval
if diff * prevdiff < 0:
if prevextremevalid:
extremediff = abs(prevval - prevextreme)
if extremediff > max:
max = extremediff
prevextremevalid = True
prevextreme = prevval
prevval = val
if diff != 0:
prevdiff = diff
return max
def cross(cp, size):
_check_params(len(cp), size)
crossings = 0
last_sample = 0
for sample in _get_samples(cp, size):
if sample <= 0 < last_sample or sample >= 0 > last_sample:
crossings += 1
last_sample = sample
return crossings
def mul(cp, size, factor):
_check_params(len(cp), size)
clip = _get_clipfn(size)
result = create_string_buffer(len(cp))
for i, sample in enumerate(_get_samples(cp, size)):
sample = clip(int(sample * factor))
_put_sample(result, size, i, sample)
return result.raw
def tomono(cp, size, fac1, fac2):
_check_params(len(cp), size)
clip = _get_clipfn(size)
sample_count = _sample_count(cp, size)
result = create_string_buffer(len(cp) / 2)
for i in range(0, sample_count, 2):
l_sample = getsample(cp, size, i)
r_sample = getsample(cp, size, i + 1)
sample = (l_sample * fac1) + (r_sample * fac2)
sample = clip(sample)
_put_sample(result, size, i / 2, sample)
return result.raw
def tostereo(cp, size, fac1, fac2):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
result = create_string_buffer(len(cp) * 2)
clip = _get_clipfn(size)
for i in range(sample_count):
sample = _get_sample(cp, size, i)
l_sample = clip(sample * fac1)
r_sample = clip(sample * fac2)
_put_sample(result, size, i * 2, l_sample)
_put_sample(result, size, i * 2 + 1, r_sample)
return result.raw
def add(cp1, cp2, size):
_check_params(len(cp1), size)
if len(cp1) != len(cp2):
raise error("Lengths should be the same")
clip = _get_clipfn(size)
sample_count = _sample_count(cp1, size)
result = create_string_buffer(len(cp1))
for i in range(sample_count):
sample1 = getsample(cp1, size, i)
sample2 = getsample(cp2, size, i)
sample = clip(sample1 + sample2)
_put_sample(result, size, i, sample)
return result.raw
def bias(cp, size, bias):
_check_params(len(cp), size)
result = create_string_buffer(len(cp))
for i, sample in enumerate(_get_samples(cp, size)):
sample = _overflow(sample + bias, size)
_put_sample(result, size, i, sample)
return result.raw
def reverse(cp, size):
_check_params(len(cp), size)
sample_count = _sample_count(cp, size)
result = create_string_buffer(len(cp))
for i, sample in enumerate(_get_samples(cp, size)):
_put_sample(result, size, sample_count - i - 1, sample)
return result.raw
def lin2lin(cp, size, size2):
_check_params(len(cp), size)
_check_size(size2)
if size == size2:
return cp
new_len = (len(cp) / size) * size2
result = create_string_buffer(new_len)
for i in range(_sample_count(cp, size)):
sample = _get_sample(cp, size, i)
if size < size2:
sample = sample << (4 * size2 / size)
elif size > size2:
sample = sample >> (4 * size / size2)
sample = _overflow(sample, size2)
_put_sample(result, size2, i, sample)
return result.raw
def ratecv(cp, size, nchannels, inrate, outrate, state, weightA=1, weightB=0):
_check_params(len(cp), size)
if nchannels < 1:
raise error("# of channels should be >= 1")
bytes_per_frame = size * nchannels
frame_count = len(cp) / bytes_per_frame
if bytes_per_frame / nchannels != size:
raise OverflowError("width * nchannels too big for a C int")
if weightA < 1 or weightB < 0:
raise error("weightA should be >= 1, weightB should be >= 0")
if len(cp) % bytes_per_frame != 0:
raise error("not a whole number of frames")
if inrate <= 0 or outrate <= 0:
raise error("sampling rate not > 0")
d = gcd(inrate, outrate)
inrate /= d
outrate /= d
prev_i = [0] * nchannels
cur_i = [0] * nchannels
if state is None:
d = -outrate
else:
d, samps = state
if len(samps) != nchannels:
raise error("illegal state argument")
prev_i, cur_i = zip(*samps)
prev_i, cur_i = list(prev_i), list(cur_i)
q = frame_count / inrate
ceiling = (q + 1) * outrate
nbytes = ceiling * bytes_per_frame
result = create_string_buffer(nbytes)
samples = _get_samples(cp, size)
out_i = 0
while True:
while d < 0:
if frame_count == 0:
samps = zip(prev_i, cur_i)
retval = result.raw
# slice off extra bytes
trim_index = (out_i * bytes_per_frame) - len(retval)
retval = buffer(retval)[:trim_index]
return (retval, (d, tuple(samps)))
for chan in range(nchannels):
prev_i[chan] = cur_i[chan]
cur_i[chan] = samples.next()
cur_i[chan] = (
(weightA * cur_i[chan] + weightB * prev_i[chan])
/ (weightA + weightB)
)
frame_count -= 1
d += outrate
while d >= 0:
for chan in range(nchannels):
cur_o = (
(prev_i[chan] * d + cur_i[chan] * (outrate - d))
/ outrate
)
_put_sample(result, size, out_i, _overflow(cur_o, size))
out_i += 1
d -= inrate
def lin2ulaw(cp, size):
raise NotImplementedError()
def ulaw2lin(cp, size):
raise NotImplementedError()
def lin2alaw(cp, size):
raise NotImplementedError()
def alaw2lin(cp, size):
raise NotImplementedError()
def lin2adpcm(cp, size, state):
raise NotImplementedError()
def adpcm2lin(cp, size, state):
raise NotImplementedError()

View File

@ -0,0 +1,175 @@
"""
This module provides scipy versions of high_pass_filter, and low_pass_filter
as well as an additional band_pass_filter.
Of course, you will need to install scipy for these to work.
When this module is imported the high and low pass filters from this module
will be used when calling audio_segment.high_pass_filter() and
audio_segment.high_pass_filter() instead of the slower, less powerful versions
provided by pydub.effects.
"""
from scipy.signal import butter, sosfilt
from .utils import (register_pydub_effect,stereo_to_ms,ms_to_stereo)
def _mk_butter_filter(freq, type, order):
"""
Args:
freq: The cutoff frequency for highpass and lowpass filters. For
band filters, a list of [low_cutoff, high_cutoff]
type: "lowpass", "highpass", or "band"
order: nth order butterworth filter (default: 5th order). The
attenuation is -6dB/octave beyond the cutoff frequency (for 1st
order). A Higher order filter will have more attenuation, each level
adding an additional -6dB (so a 3rd order butterworth filter would
be -18dB/octave).
Returns:
function which can filter a mono audio segment
"""
def filter_fn(seg):
assert seg.channels == 1
nyq = 0.5 * seg.frame_rate
try:
freqs = [f / nyq for f in freq]
except TypeError:
freqs = freq / nyq
sos = butter(order, freqs, btype=type, output='sos')
y = sosfilt(sos, seg.get_array_of_samples())
return seg._spawn(y.astype(seg.array_type))
return filter_fn
@register_pydub_effect
def band_pass_filter(seg, low_cutoff_freq, high_cutoff_freq, order=5):
filter_fn = _mk_butter_filter([low_cutoff_freq, high_cutoff_freq], 'band', order=order)
return seg.apply_mono_filter_to_each_channel(filter_fn)
@register_pydub_effect
def high_pass_filter(seg, cutoff_freq, order=5):
filter_fn = _mk_butter_filter(cutoff_freq, 'highpass', order=order)
return seg.apply_mono_filter_to_each_channel(filter_fn)
@register_pydub_effect
def low_pass_filter(seg, cutoff_freq, order=5):
filter_fn = _mk_butter_filter(cutoff_freq, 'lowpass', order=order)
return seg.apply_mono_filter_to_each_channel(filter_fn)
@register_pydub_effect
def _eq(seg, focus_freq, bandwidth=100, mode="peak", gain_dB=0, order=2):
"""
Args:
focus_freq - middle frequency or known frequency of band (in Hz)
bandwidth - range of the equalizer band
mode - Mode of Equalization(Peak/Notch(Bell Curve),High Shelf, Low Shelf)
order - Rolloff factor(1 - 6dB/Octave 2 - 12dB/Octave)
Returns:
Equalized/Filtered AudioSegment
"""
filt_mode = ["peak", "low_shelf", "high_shelf"]
if mode not in filt_mode:
raise ValueError("Incorrect Mode Selection")
if gain_dB >= 0:
if mode == "peak":
sec = band_pass_filter(seg, focus_freq - bandwidth/2, focus_freq + bandwidth/2, order = order)
seg = seg.overlay(sec - (3 - gain_dB))
return seg
if mode == "low_shelf":
sec = low_pass_filter(seg, focus_freq, order=order)
seg = seg.overlay(sec - (3 - gain_dB))
return seg
if mode == "high_shelf":
sec = high_pass_filter(seg, focus_freq, order=order)
seg = seg.overlay(sec - (3 - gain_dB))
return seg
if gain_dB < 0:
if mode == "peak":
sec = high_pass_filter(seg, focus_freq - bandwidth/2, order=order)
seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB
sec = low_pass_filter(seg, focus_freq + bandwidth/2, order=order)
seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB
return seg
if mode == "low_shelf":
sec = high_pass_filter(seg, focus_freq, order=order)
seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB
return seg
if mode=="high_shelf":
sec=low_pass_filter(seg, focus_freq, order=order)
seg=seg.overlay(sec - (3 + gain_dB)) +gain_dB
return seg
@register_pydub_effect
def eq(seg, focus_freq, bandwidth=100, channel_mode="L+R", filter_mode="peak", gain_dB=0, order=2):
"""
Args:
focus_freq - middle frequency or known frequency of band (in Hz)
bandwidth - range of the equalizer band
channel_mode - Select Channels to be affected by the filter.
L+R - Standard Stereo Filter
L - Only Left Channel is Filtered
R - Only Right Channel is Filtered
M+S - Blumlien Stereo Filter(Mid-Side)
M - Only Mid Channel is Filtered
S - Only Side Channel is Filtered
Mono Audio Segments are completely filtered.
filter_mode - Mode of Equalization(Peak/Notch(Bell Curve),High Shelf, Low Shelf)
order - Rolloff factor(1 - 6dB/Octave 2 - 12dB/Octave)
Returns:
Equalized/Filtered AudioSegment
"""
channel_modes = ["L+R", "M+S", "L", "R", "M", "S"]
if channel_mode not in channel_modes:
raise ValueError("Incorrect Channel Mode Selection")
if seg.channels == 1:
return _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order)
if channel_mode == "L+R":
return _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order)
if channel_mode == "L":
seg = seg.split_to_mono()
seg = [_eq(seg[0], focus_freq, bandwidth, filter_mode, gain_dB, order), seg[1]]
return AudioSegment.from_mono_audio_segements(seg[0], seg[1])
if channel_mode == "R":
seg = seg.split_to_mono()
seg = [seg[0], _eq(seg[1], focus_freq, bandwidth, filter_mode, gain_dB, order)]
return AudioSegment.from_mono_audio_segements(seg[0], seg[1])
if channel_mode == "M+S":
seg = stereo_to_ms(seg)
seg = _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order)
return ms_to_stereo(seg)
if channel_mode == "M":
seg = stereo_to_ms(seg).split_to_mono()
seg = [_eq(seg[0], focus_freq, bandwidth, filter_mode, gain_dB, order), seg[1]]
seg = AudioSegment.from_mono_audio_segements(seg[0], seg[1])
return ms_to_stereo(seg)
if channel_mode == "S":
seg = stereo_to_ms(seg).split_to_mono()
seg = [seg[0], _eq(seg[1], focus_freq, bandwidth, filter_mode, gain_dB, order)]
seg = AudioSegment.from_mono_audio_segements(seg[0], seg[1])
return ms_to_stereo(seg)

182
sbapp/pydub/silence.py Normal file
View File

@ -0,0 +1,182 @@
"""
Various functions for finding/manipulating silence in AudioSegments
"""
import itertools
from .utils import db_to_float
def detect_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1):
"""
Returns a list of all silent sections [start, end] in milliseconds of audio_segment.
Inverse of detect_nonsilent()
audio_segment - the segment to find silence in
min_silence_len - the minimum length for any silent section
silence_thresh - the upper bound for how quiet is silent in dFBS
seek_step - step size for interating over the segment in ms
"""
seg_len = len(audio_segment)
# you can't have a silent portion of a sound that is longer than the sound
if seg_len < min_silence_len:
return []
# convert silence threshold to a float value (so we can compare it to rms)
silence_thresh = db_to_float(silence_thresh) * audio_segment.max_possible_amplitude
# find silence and add start and end indicies to the to_cut list
silence_starts = []
# check successive (1 sec by default) chunk of sound for silence
# try a chunk at every "seek step" (or every chunk for a seek step == 1)
last_slice_start = seg_len - min_silence_len
slice_starts = range(0, last_slice_start + 1, seek_step)
# guarantee last_slice_start is included in the range
# to make sure the last portion of the audio is searched
if last_slice_start % seek_step:
slice_starts = itertools.chain(slice_starts, [last_slice_start])
for i in slice_starts:
audio_slice = audio_segment[i:i + min_silence_len]
if audio_slice.rms <= silence_thresh:
silence_starts.append(i)
# short circuit when there is no silence
if not silence_starts:
return []
# combine the silence we detected into ranges (start ms - end ms)
silent_ranges = []
prev_i = silence_starts.pop(0)
current_range_start = prev_i
for silence_start_i in silence_starts:
continuous = (silence_start_i == prev_i + seek_step)
# sometimes two small blips are enough for one particular slice to be
# non-silent, despite the silence all running together. Just combine
# the two overlapping silent ranges.
silence_has_gap = silence_start_i > (prev_i + min_silence_len)
if not continuous and silence_has_gap:
silent_ranges.append([current_range_start,
prev_i + min_silence_len])
current_range_start = silence_start_i
prev_i = silence_start_i
silent_ranges.append([current_range_start,
prev_i + min_silence_len])
return silent_ranges
def detect_nonsilent(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1):
"""
Returns a list of all nonsilent sections [start, end] in milliseconds of audio_segment.
Inverse of detect_silent()
audio_segment - the segment to find silence in
min_silence_len - the minimum length for any silent section
silence_thresh - the upper bound for how quiet is silent in dFBS
seek_step - step size for interating over the segment in ms
"""
silent_ranges = detect_silence(audio_segment, min_silence_len, silence_thresh, seek_step)
len_seg = len(audio_segment)
# if there is no silence, the whole thing is nonsilent
if not silent_ranges:
return [[0, len_seg]]
# short circuit when the whole audio segment is silent
if silent_ranges[0][0] == 0 and silent_ranges[0][1] == len_seg:
return []
prev_end_i = 0
nonsilent_ranges = []
for start_i, end_i in silent_ranges:
nonsilent_ranges.append([prev_end_i, start_i])
prev_end_i = end_i
if end_i != len_seg:
nonsilent_ranges.append([prev_end_i, len_seg])
if nonsilent_ranges[0] == [0, 0]:
nonsilent_ranges.pop(0)
return nonsilent_ranges
def split_on_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, keep_silence=100,
seek_step=1):
"""
Returns list of audio segments from splitting audio_segment on silent sections
audio_segment - original pydub.AudioSegment() object
min_silence_len - (in ms) minimum length of a silence to be used for
a split. default: 1000ms
silence_thresh - (in dBFS) anything quieter than this will be
considered silence. default: -16dBFS
keep_silence - (in ms or True/False) leave some silence at the beginning
and end of the chunks. Keeps the sound from sounding like it
is abruptly cut off.
When the length of the silence is less than the keep_silence duration
it is split evenly between the preceding and following non-silent
segments.
If True is specified, all the silence is kept, if False none is kept.
default: 100ms
seek_step - step size for interating over the segment in ms
"""
# from the itertools documentation
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
if isinstance(keep_silence, bool):
keep_silence = len(audio_segment) if keep_silence else 0
output_ranges = [
[ start - keep_silence, end + keep_silence ]
for (start,end)
in detect_nonsilent(audio_segment, min_silence_len, silence_thresh, seek_step)
]
for range_i, range_ii in pairwise(output_ranges):
last_end = range_i[1]
next_start = range_ii[0]
if next_start < last_end:
range_i[1] = (last_end+next_start)//2
range_ii[0] = range_i[1]
return [
audio_segment[ max(start,0) : min(end,len(audio_segment)) ]
for start,end in output_ranges
]
def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10):
"""
Returns the millisecond/index that the leading silence ends.
audio_segment - the segment to find silence in
silence_threshold - the upper bound for how quiet is silent in dFBS
chunk_size - chunk size for interating over the segment in ms
"""
trim_ms = 0 # ms
assert chunk_size > 0 # to avoid infinite loop
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
trim_ms += chunk_size
# if there is no end it should return the length of the segment
return min(trim_ms, len(sound))

434
sbapp/pydub/utils.py Normal file
View File

@ -0,0 +1,434 @@
from __future__ import division
import json
import os
import re
import sys
from subprocess import Popen, PIPE
from math import log, ceil
from tempfile import TemporaryFile
from warnings import warn
from functools import wraps
try:
import audioop
except ImportError:
import pyaudioop as audioop
if sys.version_info >= (3, 0):
basestring = str
FRAME_WIDTHS = {
8: 1,
16: 2,
32: 4,
}
ARRAY_TYPES = {
8: "b",
16: "h",
32: "i",
}
ARRAY_RANGES = {
8: (-0x80, 0x7f),
16: (-0x8000, 0x7fff),
32: (-0x80000000, 0x7fffffff),
}
def get_frame_width(bit_depth):
return FRAME_WIDTHS[bit_depth]
def get_array_type(bit_depth, signed=True):
t = ARRAY_TYPES[bit_depth]
if not signed:
t = t.upper()
return t
def get_min_max_value(bit_depth):
return ARRAY_RANGES[bit_depth]
def _fd_or_path_or_tempfile(fd, mode='w+b', tempfile=True):
close_fd = False
if fd is None and tempfile:
fd = TemporaryFile(mode=mode)
close_fd = True
if isinstance(fd, basestring):
fd = open(fd, mode=mode)
close_fd = True
try:
if isinstance(fd, os.PathLike):
fd = open(fd, mode=mode)
close_fd = True
except AttributeError:
# module os has no attribute PathLike, so we're on python < 3.6.
# The protocol we're trying to support doesn't exist, so just pass.
pass
return fd, close_fd
def db_to_float(db, using_amplitude=True):
"""
Converts the input db to a float, which represents the equivalent
ratio in power.
"""
db = float(db)
if using_amplitude:
return 10 ** (db / 20)
else: # using power
return 10 ** (db / 10)
def ratio_to_db(ratio, val2=None, using_amplitude=True):
"""
Converts the input float to db, which represents the equivalent
to the ratio in power represented by the multiplier passed in.
"""
ratio = float(ratio)
# accept 2 values and use the ratio of val1 to val2
if val2 is not None:
ratio = ratio / val2
# special case for multiply-by-zero (convert to silence)
if ratio == 0:
return -float('inf')
if using_amplitude:
return 20 * log(ratio, 10)
else: # using power
return 10 * log(ratio, 10)
def register_pydub_effect(fn, name=None):
"""
decorator for adding pydub effects to the AudioSegment objects.
example use:
@register_pydub_effect
def normalize(audio_segment):
...
or you can specify a name:
@register_pydub_effect("normalize")
def normalize_audio_segment(audio_segment):
...
"""
if isinstance(fn, basestring):
name = fn
return lambda fn: register_pydub_effect(fn, name)
if name is None:
name = fn.__name__
from .audio_segment import AudioSegment
setattr(AudioSegment, name, fn)
return fn
def make_chunks(audio_segment, chunk_length):
"""
Breaks an AudioSegment into chunks that are <chunk_length> milliseconds
long.
if chunk_length is 50 then you'll get a list of 50 millisecond long audio
segments back (except the last one, which can be shorter)
"""
number_of_chunks = ceil(len(audio_segment) / float(chunk_length))
return [audio_segment[i * chunk_length:(i + 1) * chunk_length]
for i in range(int(number_of_chunks))]
def which(program):
"""
Mimics behavior of UNIX which command.
"""
# Add .exe program extension for windows support
if os.name == "nt" and not program.endswith(".exe"):
program += ".exe"
envdir_list = [os.curdir] + os.environ["PATH"].split(os.pathsep)
for envdir in envdir_list:
program_path = os.path.join(envdir, program)
if os.path.isfile(program_path) and os.access(program_path, os.X_OK):
return program_path
def get_encoder_name():
"""
Return enconder default application for system, either avconv or ffmpeg
"""
if which("avconv"):
return "avconv"
elif which("ffmpeg"):
return "ffmpeg"
else:
# should raise exception
warn("Couldn't find ffmpeg or avconv - defaulting to ffmpeg, but may not work", RuntimeWarning)
return "ffmpeg"
def get_player_name():
"""
Return enconder default application for system, either avconv or ffmpeg
"""
if which("avplay"):
return "avplay"
elif which("ffplay"):
return "ffplay"
else:
# should raise exception
warn("Couldn't find ffplay or avplay - defaulting to ffplay, but may not work", RuntimeWarning)
return "ffplay"
def get_prober_name():
"""
Return probe application, either avconv or ffmpeg
"""
if which("avprobe"):
return "avprobe"
elif which("ffprobe"):
return "ffprobe"
else:
# should raise exception
warn("Couldn't find ffprobe or avprobe - defaulting to ffprobe, but may not work", RuntimeWarning)
return "ffprobe"
def fsdecode(filename):
"""Wrapper for os.fsdecode which was introduced in python 3.2 ."""
if sys.version_info >= (3, 2):
PathLikeTypes = (basestring, bytes)
if sys.version_info >= (3, 6):
PathLikeTypes += (os.PathLike,)
if isinstance(filename, PathLikeTypes):
return os.fsdecode(filename)
else:
if isinstance(filename, bytes):
return filename.decode(sys.getfilesystemencoding())
if isinstance(filename, basestring):
return filename
raise TypeError("type {0} not accepted by fsdecode".format(type(filename)))
def get_extra_info(stderr):
"""
avprobe sometimes gives more information on stderr than
on the json output. The information has to be extracted
from stderr of the format of:
' Stream #0:0: Audio: flac, 88200 Hz, stereo, s32 (24 bit)'
or (macOS version):
' Stream #0:0: Audio: vorbis'
' 44100 Hz, stereo, fltp, 320 kb/s'
:type stderr: str
:rtype: list of dict
"""
extra_info = {}
re_stream = r'(?P<space_start> +)Stream #0[:\.](?P<stream_id>([0-9]+))(?P<content_0>.+)\n?(?! *Stream)((?P<space_end> +)(?P<content_1>.+))?'
for i in re.finditer(re_stream, stderr):
if i.group('space_end') is not None and len(i.group('space_start')) <= len(
i.group('space_end')):
content_line = ','.join([i.group('content_0'), i.group('content_1')])
else:
content_line = i.group('content_0')
tokens = [x.strip() for x in re.split('[:,]', content_line) if x]
extra_info[int(i.group('stream_id'))] = tokens
return extra_info
def mediainfo_json(filepath, read_ahead_limit=-1):
"""Return json dictionary with media info(codec, duration, size, bitrate...) from filepath
"""
prober = get_prober_name()
command_args = [
"-v", "info",
"-show_format",
"-show_streams",
]
try:
command_args += [fsdecode(filepath)]
stdin_parameter = None
stdin_data = None
except TypeError:
if prober == 'ffprobe':
command_args += ["-read_ahead_limit", str(read_ahead_limit),
"cache:pipe:0"]
else:
command_args += ["-"]
stdin_parameter = PIPE
file, close_file = _fd_or_path_or_tempfile(filepath, 'rb', tempfile=False)
file.seek(0)
stdin_data = file.read()
if close_file:
file.close()
command = [prober, '-of', 'json'] + command_args
res = Popen(command, stdin=stdin_parameter, stdout=PIPE, stderr=PIPE)
output, stderr = res.communicate(input=stdin_data)
output = output.decode("utf-8", 'ignore')
stderr = stderr.decode("utf-8", 'ignore')
info = json.loads(output)
if not info:
# If ffprobe didn't give any information, just return it
# (for example, because the file doesn't exist)
return info
extra_info = get_extra_info(stderr)
audio_streams = [x for x in info['streams'] if x['codec_type'] == 'audio']
if len(audio_streams) == 0:
return info
# We just operate on the first audio stream in case there are more
stream = audio_streams[0]
def set_property(stream, prop, value):
if prop not in stream or stream[prop] == 0:
stream[prop] = value
for token in extra_info[stream['index']]:
m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
if m:
set_property(stream, 'sample_fmt', m.group(1))
set_property(stream, 'bits_per_sample', int(m.group(2)))
set_property(stream, 'bits_per_raw_sample', int(m.group(3)))
elif m2:
set_property(stream, 'sample_fmt', m2.group(1))
set_property(stream, 'bits_per_sample', int(m2.group(2)))
set_property(stream, 'bits_per_raw_sample', int(m2.group(2)))
elif re.match('(flt)p?( \(default\))?$', token):
set_property(stream, 'sample_fmt', token)
set_property(stream, 'bits_per_sample', 32)
set_property(stream, 'bits_per_raw_sample', 32)
elif re.match('(dbl)p?( \(default\))?$', token):
set_property(stream, 'sample_fmt', token)
set_property(stream, 'bits_per_sample', 64)
set_property(stream, 'bits_per_raw_sample', 64)
return info
def mediainfo(filepath):
"""Return dictionary with media info(codec, duration, size, bitrate...) from filepath
"""
prober = get_prober_name()
command_args = [
"-v", "quiet",
"-show_format",
"-show_streams",
filepath
]
command = [prober, '-of', 'old'] + command_args
res = Popen(command, stdout=PIPE)
output = res.communicate()[0].decode("utf-8")
if res.returncode != 0:
command = [prober] + command_args
output = Popen(command, stdout=PIPE).communicate()[0].decode("utf-8")
rgx = re.compile(r"(?:(?P<inner_dict>.*?):)?(?P<key>.*?)\=(?P<value>.*?)$")
info = {}
if sys.platform == 'win32':
output = output.replace("\r", "")
for line in output.split("\n"):
# print(line)
mobj = rgx.match(line)
if mobj:
# print(mobj.groups())
inner_dict, key, value = mobj.groups()
if inner_dict:
try:
info[inner_dict]
except KeyError:
info[inner_dict] = {}
info[inner_dict][key] = value
else:
info[key] = value
return info
def cache_codecs(function):
cache = {}
@wraps(function)
def wrapper():
try:
return cache[0]
except:
cache[0] = function()
return cache[0]
return wrapper
@cache_codecs
def get_supported_codecs():
encoder = get_encoder_name()
command = [encoder, "-codecs"]
res = Popen(command, stdout=PIPE, stderr=PIPE)
output = res.communicate()[0].decode("utf-8")
if res.returncode != 0:
return []
if sys.platform == 'win32':
output = output.replace("\r", "")
rgx = re.compile(r"^([D.][E.][AVS.][I.][L.][S.]) (\w*) +(.*)")
decoders = set()
encoders = set()
for line in output.split('\n'):
match = rgx.match(line.strip())
if not match:
continue
flags, codec, name = match.groups()
if flags[0] == 'D':
decoders.add(codec)
if flags[1] == 'E':
encoders.add(codec)
return (decoders, encoders)
def get_supported_decoders():
return get_supported_codecs()[0]
def get_supported_encoders():
return get_supported_codecs()[1]
def stereo_to_ms(audio_segment):
'''
Left-Right -> Mid-Side
'''
channel = audio_segment.split_to_mono()
channel = [channel[0].overlay(channel[1]), channel[0].overlay(channel[1].invert_phase())]
return AudioSegment.from_mono_audiosegments(channel[0], channel[1])
def ms_to_stereo(audio_segment):
'''
Mid-Side -> Left-Right
'''
channel = audio_segment.split_to_mono()
channel = [channel[0].overlay(channel[1]) - 3, channel[0].overlay(channel[1].invert_phase()) - 3]
return AudioSegment.from_mono_audiosegments(channel[0], channel[1])