From 53479d47008eb80235ec2ea21676a28c2a0a8139 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 4 Jun 2024 03:19:24 +0200 Subject: [PATCH] Added PyDub --- sbapp/pydub/__init__.py | 1 + sbapp/pydub/audio_segment.py | 1399 ++++++++++++++++++++++++++++++++++ sbapp/pydub/effects.py | 341 +++++++++ sbapp/pydub/exceptions.py | 32 + sbapp/pydub/generators.py | 142 ++++ sbapp/pydub/logging_utils.py | 14 + sbapp/pydub/playback.py | 71 ++ sbapp/pydub/pyaudioop.py | 553 ++++++++++++++ sbapp/pydub/scipy_effects.py | 175 +++++ sbapp/pydub/silence.py | 182 +++++ sbapp/pydub/utils.py | 434 +++++++++++ 11 files changed, 3344 insertions(+) create mode 100644 sbapp/pydub/__init__.py create mode 100644 sbapp/pydub/audio_segment.py create mode 100644 sbapp/pydub/effects.py create mode 100644 sbapp/pydub/exceptions.py create mode 100644 sbapp/pydub/generators.py create mode 100644 sbapp/pydub/logging_utils.py create mode 100644 sbapp/pydub/playback.py create mode 100644 sbapp/pydub/pyaudioop.py create mode 100644 sbapp/pydub/scipy_effects.py create mode 100644 sbapp/pydub/silence.py create mode 100644 sbapp/pydub/utils.py diff --git a/sbapp/pydub/__init__.py b/sbapp/pydub/__init__.py new file mode 100644 index 0000000..65e30b4 --- /dev/null +++ b/sbapp/pydub/__init__.py @@ -0,0 +1 @@ +from .audio_segment import AudioSegment \ No newline at end of file diff --git a/sbapp/pydub/audio_segment.py b/sbapp/pydub/audio_segment.py new file mode 100644 index 0000000..14ea46e --- /dev/null +++ b/sbapp/pydub/audio_segment.py @@ -0,0 +1,1399 @@ +from __future__ import division + +import array +import os +import subprocess +from tempfile import TemporaryFile, NamedTemporaryFile +import wave +import sys +import struct +from .logging_utils import log_conversion, log_subprocess_output +from .utils import mediainfo_json, fsdecode +import base64 +from collections import namedtuple + +try: + from StringIO import StringIO +except: + from io import StringIO + +from io import BytesIO + +try: + from itertools import izip +except: + izip = zip + +from .utils import ( + _fd_or_path_or_tempfile, + db_to_float, + ratio_to_db, + get_encoder_name, + get_array_type, + audioop, +) +from .exceptions import ( + TooManyMissingFrames, + InvalidDuration, + InvalidID3TagVersion, + InvalidTag, + CouldntDecodeError, + CouldntEncodeError, + MissingAudioParameter, +) + +if sys.version_info >= (3, 0): + basestring = str + xrange = range + StringIO = BytesIO + + +class ClassPropertyDescriptor(object): + + def __init__(self, fget, fset=None): + self.fget = fget + self.fset = fset + + def __get__(self, obj, klass=None): + if klass is None: + klass = type(obj) + return self.fget.__get__(obj, klass)() + + def __set__(self, obj, value): + if not self.fset: + raise AttributeError("can't set attribute") + type_ = type(obj) + return self.fset.__get__(obj, type_)(value) + + def setter(self, func): + if not isinstance(func, (classmethod, staticmethod)): + func = classmethod(func) + self.fset = func + return self + + +def classproperty(func): + if not isinstance(func, (classmethod, staticmethod)): + func = classmethod(func) + + return ClassPropertyDescriptor(func) + + +AUDIO_FILE_EXT_ALIASES = { + "m4a": "mp4", + "wave": "wav", +} + +WavSubChunk = namedtuple('WavSubChunk', ['id', 'position', 'size']) +WavData = namedtuple('WavData', ['audio_format', 'channels', 'sample_rate', + 'bits_per_sample', 'raw_data']) + + +def extract_wav_headers(data): + # def search_subchunk(data, subchunk_id): + pos = 12 # The size of the RIFF chunk descriptor + subchunks = [] + while pos + 8 <= len(data) and len(subchunks) < 10: + subchunk_id = data[pos:pos + 4] + subchunk_size = struct.unpack_from(' 2**32: + raise CouldntDecodeError("Unable to process >4GB files") + + # Set the file size in the RIFF chunk descriptor + data[4:8] = struct.pack(' b'\x7f'[0]]) + old_bytes = struct.pack(pack_fmt, b0, b1, b2) + byte_buffer.write(old_bytes) + + self._data = byte_buffer.getvalue() + self.sample_width = 4 + self.frame_width = self.channels * self.sample_width + + super(AudioSegment, self).__init__(*args, **kwargs) + + @property + def raw_data(self): + """ + public access to the raw audio data as a bytestring + """ + return self._data + + def get_array_of_samples(self, array_type_override=None): + """ + returns the raw_data as an array of samples + """ + if array_type_override is None: + array_type_override = self.array_type + return array.array(array_type_override, self._data) + + @property + def array_type(self): + return get_array_type(self.sample_width * 8) + + def __len__(self): + """ + returns the length of this audio segment in milliseconds + """ + return round(1000 * (self.frame_count() / self.frame_rate)) + + def __eq__(self, other): + try: + return self._data == other._data + except: + return False + + def __hash__(self): + return hash(AudioSegment) ^ hash((self.channels, self.frame_rate, self.sample_width, self._data)) + + def __ne__(self, other): + return not (self == other) + + def __iter__(self): + return (self[i] for i in xrange(len(self))) + + def __getitem__(self, millisecond): + if isinstance(millisecond, slice): + if millisecond.step: + return ( + self[i:i + millisecond.step] + for i in xrange(*millisecond.indices(len(self))) + ) + + start = millisecond.start if millisecond.start is not None else 0 + end = millisecond.stop if millisecond.stop is not None \ + else len(self) + + start = min(start, len(self)) + end = min(end, len(self)) + else: + start = millisecond + end = millisecond + 1 + + start = self._parse_position(start) * self.frame_width + end = self._parse_position(end) * self.frame_width + data = self._data[start:end] + + # ensure the output is as long as the requester is expecting + expected_length = end - start + missing_frames = (expected_length - len(data)) // self.frame_width + if missing_frames: + if missing_frames > self.frame_count(ms=2): + raise TooManyMissingFrames( + "You should never be filling in " + " more than 2 ms with silence here, " + "missing frames: %s" % missing_frames) + silence = audioop.mul(data[:self.frame_width], + self.sample_width, 0) + data += (silence * missing_frames) + + return self._spawn(data) + + def get_sample_slice(self, start_sample=None, end_sample=None): + """ + Get a section of the audio segment by sample index. + + NOTE: Negative indices do *not* address samples backword + from the end of the audio segment like a python list. + This is intentional. + """ + max_val = int(self.frame_count()) + + def bounded(val, default): + if val is None: + return default + if val < 0: + return 0 + if val > max_val: + return max_val + return val + + start_i = bounded(start_sample, 0) * self.frame_width + end_i = bounded(end_sample, max_val) * self.frame_width + + data = self._data[start_i:end_i] + return self._spawn(data) + + def __add__(self, arg): + if isinstance(arg, AudioSegment): + return self.append(arg, crossfade=0) + else: + return self.apply_gain(arg) + + def __radd__(self, rarg): + """ + Permit use of sum() builtin with an iterable of AudioSegments + """ + if rarg == 0: + return self + raise TypeError("Gains must be the second addend after the " + "AudioSegment") + + def __sub__(self, arg): + if isinstance(arg, AudioSegment): + raise TypeError("AudioSegment objects can't be subtracted from " + "each other") + else: + return self.apply_gain(-arg) + + def __mul__(self, arg): + """ + If the argument is an AudioSegment, overlay the multiplied audio + segment. + + If it's a number, just use the string multiply operation to repeat the + audio. + + The following would return an AudioSegment that contains the + audio of audio_seg eight times + + `audio_seg * 8` + """ + if isinstance(arg, AudioSegment): + return self.overlay(arg, position=0, loop=True) + else: + return self._spawn(data=self._data * arg) + + def _spawn(self, data, overrides={}): + """ + Creates a new audio segment using the metadata from the current one + and the data passed in. Should be used whenever an AudioSegment is + being returned by an operation that would alters the current one, + since AudioSegment objects are immutable. + """ + # accept lists of data chunks + if isinstance(data, list): + data = b''.join(data) + + if isinstance(data, array.array): + try: + data = data.tobytes() + except: + data = data.tostring() + + # accept file-like objects + if hasattr(data, 'read'): + if hasattr(data, 'seek'): + data.seek(0) + data = data.read() + + metadata = { + 'sample_width': self.sample_width, + 'frame_rate': self.frame_rate, + 'frame_width': self.frame_width, + 'channels': self.channels + } + metadata.update(overrides) + return self.__class__(data=data, metadata=metadata) + + @classmethod + def _sync(cls, *segs): + channels = max(seg.channels for seg in segs) + frame_rate = max(seg.frame_rate for seg in segs) + sample_width = max(seg.sample_width for seg in segs) + + return tuple( + seg.set_channels(channels).set_frame_rate(frame_rate).set_sample_width(sample_width) + for seg in segs + ) + + def _parse_position(self, val): + if val < 0: + val = len(self) - abs(val) + val = self.frame_count(ms=len(self)) if val == float("inf") else \ + self.frame_count(ms=val) + return int(val) + + @classmethod + def empty(cls): + return cls(b'', metadata={ + "channels": 1, + "sample_width": 1, + "frame_rate": 1, + "frame_width": 1 + }) + + @classmethod + def silent(cls, duration=1000, frame_rate=11025): + """ + Generate a silent audio segment. + duration specified in milliseconds (default duration: 1000ms, default frame_rate: 11025). + """ + frames = int(frame_rate * (duration / 1000.0)) + data = b"\0\0" * frames + return cls(data, metadata={"channels": 1, + "sample_width": 2, + "frame_rate": frame_rate, + "frame_width": 2}) + + @classmethod + def from_mono_audiosegments(cls, *mono_segments): + if not len(mono_segments): + raise ValueError("At least one AudioSegment instance is required") + + segs = cls._sync(*mono_segments) + + if segs[0].channels != 1: + raise ValueError( + "AudioSegment.from_mono_audiosegments requires all arguments are mono AudioSegment instances") + + channels = len(segs) + sample_width = segs[0].sample_width + frame_rate = segs[0].frame_rate + + frame_count = max(int(seg.frame_count()) for seg in segs) + data = array.array( + segs[0].array_type, + b'\0' * (frame_count * sample_width * channels) + ) + + for i, seg in enumerate(segs): + data[i::channels] = seg.get_array_of_samples() + + return cls( + data, + channels=channels, + sample_width=sample_width, + frame_rate=frame_rate, + ) + + @classmethod + def from_file_using_temporary_files(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs): + orig_file = file + file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) + + if format: + format = format.lower() + format = AUDIO_FILE_EXT_ALIASES.get(format, format) + + def is_format(f): + f = f.lower() + if format == f: + return True + if isinstance(orig_file, basestring): + return orig_file.lower().endswith(".{0}".format(f)) + if isinstance(orig_file, bytes): + return orig_file.lower().endswith((".{0}".format(f)).encode('utf8')) + return False + + if is_format("wav"): + try: + obj = cls._from_safe_wav(file) + if close_file: + file.close() + if start_second is None and duration is None: + return obj + elif start_second is not None and duration is None: + return obj[start_second*1000:] + elif start_second is None and duration is not None: + return obj[:duration*1000] + else: + return obj[start_second*1000:(start_second+duration)*1000] + except: + file.seek(0) + elif is_format("raw") or is_format("pcm"): + sample_width = kwargs['sample_width'] + frame_rate = kwargs['frame_rate'] + channels = kwargs['channels'] + metadata = { + 'sample_width': sample_width, + 'frame_rate': frame_rate, + 'channels': channels, + 'frame_width': channels * sample_width + } + obj = cls(data=file.read(), metadata=metadata) + if close_file: + file.close() + if start_second is None and duration is None: + return obj + elif start_second is not None and duration is None: + return obj[start_second * 1000:] + elif start_second is None and duration is not None: + return obj[:duration * 1000] + else: + return obj[start_second * 1000:(start_second + duration) * 1000] + + input_file = NamedTemporaryFile(mode='wb', delete=False) + try: + input_file.write(file.read()) + except(OSError): + input_file.flush() + input_file.close() + input_file = NamedTemporaryFile(mode='wb', delete=False, buffering=2 ** 31 - 1) + if close_file: + file.close() + close_file = True + file = open(orig_file, buffering=2 ** 13 - 1, mode='rb') + reader = file.read(2 ** 31 - 1) + while reader: + input_file.write(reader) + reader = file.read(2 ** 31 - 1) + input_file.flush() + if close_file: + file.close() + + output = NamedTemporaryFile(mode="rb", delete=False) + + conversion_command = [cls.converter, + '-y', # always overwrite existing files + ] + + # If format is not defined + # ffmpeg/avconv will detect it automatically + if format: + conversion_command += ["-f", format] + + if codec: + # force audio decoder + conversion_command += ["-acodec", codec] + + conversion_command += [ + "-i", input_file.name, # input_file options (filename last) + "-vn", # Drop any video streams if there are any + "-f", "wav" # output options (filename last) + ] + + if start_second is not None: + conversion_command += ["-ss", str(start_second)] + + if duration is not None: + conversion_command += ["-t", str(duration)] + + conversion_command += [output.name] + + if parameters is not None: + # extend arguments with arbitrary set + conversion_command.extend(parameters) + + log_conversion(conversion_command) + + with open(os.devnull, 'rb') as devnull: + p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p_out, p_err = p.communicate() + + log_subprocess_output(p_out) + log_subprocess_output(p_err) + + try: + if p.returncode != 0: + raise CouldntDecodeError( + "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format( + p.returncode, p_err.decode(errors='ignore') )) + obj = cls._from_safe_wav(output) + finally: + input_file.close() + output.close() + os.unlink(input_file.name) + os.unlink(output.name) + + if start_second is None and duration is None: + return obj + elif start_second is not None and duration is None: + return obj[0:] + elif start_second is None and duration is not None: + return obj[:duration * 1000] + else: + return obj[0:duration * 1000] + + + @classmethod + def from_file(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs): + orig_file = file + try: + filename = fsdecode(file) + except TypeError: + filename = None + file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) + + if format: + format = format.lower() + format = AUDIO_FILE_EXT_ALIASES.get(format, format) + + def is_format(f): + f = f.lower() + if format == f: + return True + + if filename: + return filename.lower().endswith(".{0}".format(f)) + + return False + + if is_format("wav"): + try: + if start_second is None and duration is None: + return cls._from_safe_wav(file) + elif start_second is not None and duration is None: + return cls._from_safe_wav(file)[start_second*1000:] + elif start_second is None and duration is not None: + return cls._from_safe_wav(file)[:duration*1000] + else: + return cls._from_safe_wav(file)[start_second*1000:(start_second+duration)*1000] + except: + file.seek(0) + elif is_format("raw") or is_format("pcm"): + sample_width = kwargs['sample_width'] + frame_rate = kwargs['frame_rate'] + channels = kwargs['channels'] + metadata = { + 'sample_width': sample_width, + 'frame_rate': frame_rate, + 'channels': channels, + 'frame_width': channels * sample_width + } + if start_second is None and duration is None: + return cls(data=file.read(), metadata=metadata) + elif start_second is not None and duration is None: + return cls(data=file.read(), metadata=metadata)[start_second*1000:] + elif start_second is None and duration is not None: + return cls(data=file.read(), metadata=metadata)[:duration*1000] + else: + return cls(data=file.read(), metadata=metadata)[start_second*1000:(start_second+duration)*1000] + + conversion_command = [cls.converter, + '-y', # always overwrite existing files + ] + + # If format is not defined + # ffmpeg/avconv will detect it automatically + if format: + conversion_command += ["-f", format] + + if codec: + # force audio decoder + conversion_command += ["-acodec", codec] + + read_ahead_limit = kwargs.get('read_ahead_limit', -1) + if filename: + conversion_command += ["-i", filename] + stdin_parameter = None + stdin_data = None + else: + if cls.converter == 'ffmpeg': + conversion_command += ["-read_ahead_limit", str(read_ahead_limit), + "-i", "cache:pipe:0"] + else: + conversion_command += ["-i", "-"] + stdin_parameter = subprocess.PIPE + stdin_data = file.read() + + if codec: + info = None + else: + info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit) + if info: + audio_streams = [x for x in info['streams'] + if x['codec_type'] == 'audio'] + # This is a workaround for some ffprobe versions that always say + # that mp3/mp4/aac/webm/ogg files contain fltp samples + audio_codec = audio_streams[0].get('codec_name') + if (audio_streams[0].get('sample_fmt') == 'fltp' and + audio_codec in ['mp3', 'mp4', 'aac', 'webm', 'ogg']): + bits_per_sample = 16 + else: + bits_per_sample = audio_streams[0]['bits_per_sample'] + if bits_per_sample == 8: + acodec = 'pcm_u8' + else: + acodec = 'pcm_s%dle' % bits_per_sample + + conversion_command += ["-acodec", acodec] + + conversion_command += [ + "-vn", # Drop any video streams if there are any + "-f", "wav" # output options (filename last) + ] + + if start_second is not None: + conversion_command += ["-ss", str(start_second)] + + if duration is not None: + conversion_command += ["-t", str(duration)] + + conversion_command += ["-"] + + if parameters is not None: + # extend arguments with arbitrary set + conversion_command.extend(parameters) + + log_conversion(conversion_command) + + p = subprocess.Popen(conversion_command, stdin=stdin_parameter, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p_out, p_err = p.communicate(input=stdin_data) + + if p.returncode != 0 or len(p_out) == 0: + if close_file: + file.close() + raise CouldntDecodeError( + "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format( + p.returncode, p_err.decode(errors='ignore') )) + + p_out = bytearray(p_out) + fix_wav_headers(p_out) + p_out = bytes(p_out) + obj = cls(p_out) + + if close_file: + file.close() + + if start_second is None and duration is None: + return obj + elif start_second is not None and duration is None: + return obj[0:] + elif start_second is None and duration is not None: + return obj[:duration * 1000] + else: + return obj[0:duration * 1000] + + @classmethod + def from_mp3(cls, file, parameters=None): + return cls.from_file(file, 'mp3', parameters=parameters) + + @classmethod + def from_flv(cls, file, parameters=None): + return cls.from_file(file, 'flv', parameters=parameters) + + @classmethod + def from_ogg(cls, file, parameters=None): + return cls.from_file(file, 'ogg', parameters=parameters) + + @classmethod + def from_wav(cls, file, parameters=None): + return cls.from_file(file, 'wav', parameters=parameters) + + @classmethod + def from_raw(cls, file, **kwargs): + return cls.from_file(file, 'raw', sample_width=kwargs['sample_width'], frame_rate=kwargs['frame_rate'], + channels=kwargs['channels']) + + @classmethod + def _from_safe_wav(cls, file): + file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) + file.seek(0) + obj = cls(data=file) + if close_file: + file.close() + return obj + + def export(self, out_f=None, format='mp3', codec=None, bitrate=None, parameters=None, tags=None, id3v2_version='4', + cover=None): + """ + Export an AudioSegment to a file with given options + + out_f (string): + Path to destination audio file. Also accepts os.PathLike objects on + python >= 3.6 + + format (string) + Format for destination audio file. + ('mp3', 'wav', 'raw', 'ogg' or other ffmpeg/avconv supported files) + + codec (string) + Codec used to encode the destination file. + + bitrate (string) + Bitrate used when encoding destination file. (64, 92, 128, 256, 312k...) + Each codec accepts different bitrate arguments so take a look at the + ffmpeg documentation for details (bitrate usually shown as -b, -ba or + -a:b). + + parameters (list of strings) + Aditional ffmpeg/avconv parameters + + tags (dict) + Set metadata information to destination files + usually used as tags. ({title='Song Title', artist='Song Artist'}) + + id3v2_version (string) + Set ID3v2 version for tags. (default: '4') + + cover (file) + Set cover for audio file from image file. (png or jpg) + """ + id3v2_allowed_versions = ['3', '4'] + + if format == "raw" and (codec is not None or parameters is not None): + raise AttributeError( + 'Can not invoke ffmpeg when export format is "raw"; ' + 'specify an ffmpeg raw format like format="s16le" instead ' + 'or call export(format="raw") with no codec or parameters') + + out_f, _ = _fd_or_path_or_tempfile(out_f, 'wb+') + out_f.seek(0) + + if format == "raw": + out_f.write(self._data) + out_f.seek(0) + return out_f + + # wav with no ffmpeg parameters can just be written directly to out_f + easy_wav = format == "wav" and codec is None and parameters is None + + if easy_wav: + data = out_f + else: + data = NamedTemporaryFile(mode="wb", delete=False) + + pcm_for_wav = self._data + if self.sample_width == 1: + # convert to unsigned integers for wav + pcm_for_wav = audioop.bias(self._data, 1, 128) + + wave_data = wave.open(data, 'wb') + wave_data.setnchannels(self.channels) + wave_data.setsampwidth(self.sample_width) + wave_data.setframerate(self.frame_rate) + # For some reason packing the wave header struct with + # a float in python 2 doesn't throw an exception + wave_data.setnframes(int(self.frame_count())) + wave_data.writeframesraw(pcm_for_wav) + wave_data.close() + + # for easy wav files, we're done (wav data is written directly to out_f) + if easy_wav: + out_f.seek(0) + return out_f + + output = NamedTemporaryFile(mode="w+b", delete=False) + + # build converter command to export + conversion_command = [ + self.converter, + '-y', # always overwrite existing files + "-f", "wav", "-i", data.name, # input options (filename last) + ] + + if codec is None: + codec = self.DEFAULT_CODECS.get(format, None) + + if cover is not None: + if cover.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff')) and format == "mp3": + conversion_command.extend(["-i", cover, "-map", "0", "-map", "1", "-c:v", "mjpeg"]) + else: + raise AttributeError( + "Currently cover images are only supported by MP3 files. The allowed image formats are: .tif, .jpg, .bmp, .jpeg and .png.") + + if codec is not None: + # force audio encoder + conversion_command.extend(["-acodec", codec]) + + if bitrate is not None: + conversion_command.extend(["-b:a", bitrate]) + + if parameters is not None: + # extend arguments with arbitrary set + conversion_command.extend(parameters) + + if tags is not None: + if not isinstance(tags, dict): + raise InvalidTag("Tags must be a dictionary.") + else: + # Extend converter command with tags + # print(tags) + for key, value in tags.items(): + conversion_command.extend( + ['-metadata', '{0}={1}'.format(key, value)]) + + if format == 'mp3': + # set id3v2 tag version + if id3v2_version not in id3v2_allowed_versions: + raise InvalidID3TagVersion( + "id3v2_version not allowed, allowed versions: %s" % id3v2_allowed_versions) + conversion_command.extend([ + "-id3v2_version", id3v2_version + ]) + + if sys.platform == 'darwin' and codec == 'mp3': + conversion_command.extend(["-write_xing", "0"]) + + conversion_command.extend([ + "-f", format, output.name, # output options (filename last) + ]) + + log_conversion(conversion_command) + + # read stdin / write stdout + with open(os.devnull, 'rb') as devnull: + p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p_out, p_err = p.communicate() + + log_subprocess_output(p_out) + log_subprocess_output(p_err) + + if p.returncode != 0: + raise CouldntEncodeError( + "Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}\n\nOutput from ffmpeg/avlib:\n\n{2}".format( + p.returncode, conversion_command, p_err.decode(errors='ignore') )) + + output.seek(0) + out_f.write(output.read()) + + data.close() + output.close() + + os.unlink(data.name) + os.unlink(output.name) + + out_f.seek(0) + return out_f + + def get_frame(self, index): + frame_start = index * self.frame_width + frame_end = frame_start + self.frame_width + return self._data[frame_start:frame_end] + + def frame_count(self, ms=None): + """ + returns the number of frames for the given number of milliseconds, or + if not specified, the number of frames in the whole AudioSegment + """ + if ms is not None: + return ms * (self.frame_rate / 1000.0) + else: + return float(len(self._data) // self.frame_width) + + def set_sample_width(self, sample_width): + if sample_width == self.sample_width: + return self + + frame_width = self.channels * sample_width + + return self._spawn( + audioop.lin2lin(self._data, self.sample_width, sample_width), + overrides={'sample_width': sample_width, 'frame_width': frame_width} + ) + + def set_frame_rate(self, frame_rate): + if frame_rate == self.frame_rate: + return self + + if self._data: + converted, _ = audioop.ratecv(self._data, self.sample_width, + self.channels, self.frame_rate, + frame_rate, None) + else: + converted = self._data + + return self._spawn(data=converted, + overrides={'frame_rate': frame_rate}) + + def set_channels(self, channels): + if channels == self.channels: + return self + + if channels == 2 and self.channels == 1: + fn = audioop.tostereo + frame_width = self.frame_width * 2 + fac = 1 + converted = fn(self._data, self.sample_width, fac, fac) + elif channels == 1 and self.channels == 2: + fn = audioop.tomono + frame_width = self.frame_width // 2 + fac = 0.5 + converted = fn(self._data, self.sample_width, fac, fac) + elif channels == 1: + channels_data = [seg.get_array_of_samples() for seg in self.split_to_mono()] + frame_count = int(self.frame_count()) + converted = array.array( + channels_data[0].typecode, + b'\0' * (frame_count * self.sample_width) + ) + for raw_channel_data in channels_data: + for i in range(frame_count): + converted[i] += raw_channel_data[i] // self.channels + frame_width = self.frame_width // self.channels + elif self.channels == 1: + dup_channels = [self for iChannel in range(channels)] + return AudioSegment.from_mono_audiosegments(*dup_channels) + else: + raise ValueError( + "AudioSegment.set_channels only supports mono-to-multi channel and multi-to-mono channel conversion") + + return self._spawn(data=converted, + overrides={ + 'channels': channels, + 'frame_width': frame_width}) + + def split_to_mono(self): + if self.channels == 1: + return [self] + + samples = self.get_array_of_samples() + + mono_channels = [] + for i in range(self.channels): + samples_for_current_channel = samples[i::self.channels] + + try: + mono_data = samples_for_current_channel.tobytes() + except AttributeError: + mono_data = samples_for_current_channel.tostring() + + mono_channels.append( + self._spawn(mono_data, overrides={"channels": 1, "frame_width": self.sample_width}) + ) + + return mono_channels + + @property + def rms(self): + return audioop.rms(self._data, self.sample_width) + + @property + def dBFS(self): + rms = self.rms + if not rms: + return -float("infinity") + return ratio_to_db(self.rms / self.max_possible_amplitude) + + @property + def max(self): + return audioop.max(self._data, self.sample_width) + + @property + def max_possible_amplitude(self): + bits = self.sample_width * 8 + max_possible_val = (2 ** bits) + + # since half is above 0 and half is below the max amplitude is divided + return max_possible_val / 2 + + @property + def max_dBFS(self): + return ratio_to_db(self.max, self.max_possible_amplitude) + + @property + def duration_seconds(self): + return self.frame_rate and self.frame_count() / self.frame_rate or 0.0 + + def get_dc_offset(self, channel=1): + """ + Returns a value between -1.0 and 1.0 representing the DC offset of a + channel (1 for left, 2 for right). + """ + if not 1 <= channel <= 2: + raise ValueError("channel value must be 1 (left) or 2 (right)") + + if self.channels == 1: + data = self._data + elif channel == 1: + data = audioop.tomono(self._data, self.sample_width, 1, 0) + else: + data = audioop.tomono(self._data, self.sample_width, 0, 1) + + return float(audioop.avg(data, self.sample_width)) / self.max_possible_amplitude + + def remove_dc_offset(self, channel=None, offset=None): + """ + Removes DC offset of given channel. Calculates offset if it's not given. + Offset values must be in range -1.0 to 1.0. If channel is None, removes + DC offset from all available channels. + """ + if channel and not 1 <= channel <= 2: + raise ValueError("channel value must be None, 1 (left) or 2 (right)") + + if offset and not -1.0 <= offset <= 1.0: + raise ValueError("offset value must be in range -1.0 to 1.0") + + if offset: + offset = int(round(offset * self.max_possible_amplitude)) + + def remove_data_dc(data, off): + if not off: + off = audioop.avg(data, self.sample_width) + return audioop.bias(data, self.sample_width, -off) + + if self.channels == 1: + return self._spawn(data=remove_data_dc(self._data, offset)) + + left_channel = audioop.tomono(self._data, self.sample_width, 1, 0) + right_channel = audioop.tomono(self._data, self.sample_width, 0, 1) + + if not channel or channel == 1: + left_channel = remove_data_dc(left_channel, offset) + + if not channel or channel == 2: + right_channel = remove_data_dc(right_channel, offset) + + left_channel = audioop.tostereo(left_channel, self.sample_width, 1, 0) + right_channel = audioop.tostereo(right_channel, self.sample_width, 0, 1) + + return self._spawn(data=audioop.add(left_channel, right_channel, + self.sample_width)) + + def apply_gain(self, volume_change): + return self._spawn(data=audioop.mul(self._data, self.sample_width, + db_to_float(float(volume_change)))) + + def overlay(self, seg, position=0, loop=False, times=None, gain_during_overlay=None): + """ + Overlay the provided segment on to this segment starting at the + specificed position and using the specfied looping beahvior. + + seg (AudioSegment): + The audio segment to overlay on to this one. + + position (optional int): + The position to start overlaying the provided segment in to this + one. + + loop (optional bool): + Loop seg as many times as necessary to match this segment's length. + Overrides loops param. + + times (optional int): + Loop seg the specified number of times or until it matches this + segment's length. 1 means once, 2 means twice, ... 0 would make the + call a no-op + gain_during_overlay (optional int): + Changes this segment's volume by the specified amount during the + duration of time that seg is overlaid on top of it. When negative, + this has the effect of 'ducking' the audio under the overlay. + """ + + if loop: + # match loop=True's behavior with new times (count) mechinism. + times = -1 + elif times is None: + # no times specified, just once through + times = 1 + elif times == 0: + # it's a no-op, make a copy since we never mutate + return self._spawn(self._data) + + output = StringIO() + + seg1, seg2 = AudioSegment._sync(self, seg) + sample_width = seg1.sample_width + spawn = seg1._spawn + + output.write(seg1[:position]._data) + + # drop down to the raw data + seg1 = seg1[position:]._data + seg2 = seg2._data + pos = 0 + seg1_len = len(seg1) + seg2_len = len(seg2) + while times: + remaining = max(0, seg1_len - pos) + if seg2_len >= remaining: + seg2 = seg2[:remaining] + seg2_len = remaining + # we've hit the end, we're done looping (if we were) and this + # is our last go-around + times = 1 + + if gain_during_overlay: + seg1_overlaid = seg1[pos:pos + seg2_len] + seg1_adjusted_gain = audioop.mul(seg1_overlaid, self.sample_width, + db_to_float(float(gain_during_overlay))) + output.write(audioop.add(seg1_adjusted_gain, seg2, sample_width)) + else: + output.write(audioop.add(seg1[pos:pos + seg2_len], seg2, + sample_width)) + pos += seg2_len + + # dec times to break our while loop (eventually) + times -= 1 + + output.write(seg1[pos:]) + + return spawn(data=output) + + def append(self, seg, crossfade=100): + seg1, seg2 = AudioSegment._sync(self, seg) + + if not crossfade: + return seg1._spawn(seg1._data + seg2._data) + elif crossfade > len(self): + raise ValueError("Crossfade is longer than the original AudioSegment ({}ms > {}ms)".format( + crossfade, len(self) + )) + elif crossfade > len(seg): + raise ValueError("Crossfade is longer than the appended AudioSegment ({}ms > {}ms)".format( + crossfade, len(seg) + )) + + xf = seg1[-crossfade:].fade(to_gain=-120, start=0, end=float('inf')) + xf *= seg2[:crossfade].fade(from_gain=-120, start=0, end=float('inf')) + + output = TemporaryFile() + + output.write(seg1[:-crossfade]._data) + output.write(xf._data) + output.write(seg2[crossfade:]._data) + + output.seek(0) + obj = seg1._spawn(data=output) + output.close() + return obj + + def fade(self, to_gain=0, from_gain=0, start=None, end=None, + duration=None): + """ + Fade the volume of this audio segment. + + to_gain (float): + resulting volume_change in db + + start (int): + default = beginning of the segment + when in this segment to start fading in milliseconds + + end (int): + default = end of the segment + when in this segment to start fading in milliseconds + + duration (int): + default = until the end of the audio segment + the duration of the fade + """ + if None not in [duration, end, start]: + raise TypeError('Only two of the three arguments, "start", ' + '"end", and "duration" may be specified') + + # no fade == the same audio + if to_gain == 0 and from_gain == 0: + return self + + start = min(len(self), start) if start is not None else None + end = min(len(self), end) if end is not None else None + + if start is not None and start < 0: + start += len(self) + if end is not None and end < 0: + end += len(self) + + if duration is not None and duration < 0: + raise InvalidDuration("duration must be a positive integer") + + if duration: + if start is not None: + end = start + duration + elif end is not None: + start = end - duration + else: + duration = end - start + + from_power = db_to_float(from_gain) + + output = [] + + # original data - up until the crossfade portion, as is + before_fade = self[:start]._data + if from_gain != 0: + before_fade = audioop.mul(before_fade, + self.sample_width, + from_power) + output.append(before_fade) + + gain_delta = db_to_float(to_gain) - from_power + + # fades longer than 100ms can use coarse fading (one gain step per ms), + # shorter fades will have audible clicks so they use precise fading + # (one gain step per sample) + if duration > 100: + scale_step = gain_delta / duration + + for i in range(duration): + volume_change = from_power + (scale_step * i) + chunk = self[start + i] + chunk = audioop.mul(chunk._data, + self.sample_width, + volume_change) + + output.append(chunk) + else: + start_frame = self.frame_count(ms=start) + end_frame = self.frame_count(ms=end) + fade_frames = end_frame - start_frame + scale_step = gain_delta / fade_frames + + for i in range(int(fade_frames)): + volume_change = from_power + (scale_step * i) + sample = self.get_frame(int(start_frame + i)) + sample = audioop.mul(sample, self.sample_width, volume_change) + + output.append(sample) + + # original data after the crossfade portion, at the new volume + after_fade = self[end:]._data + if to_gain != 0: + after_fade = audioop.mul(after_fade, + self.sample_width, + db_to_float(to_gain)) + output.append(after_fade) + + return self._spawn(data=output) + + def fade_out(self, duration): + return self.fade(to_gain=-120, duration=duration, end=float('inf')) + + def fade_in(self, duration): + return self.fade(from_gain=-120, duration=duration, start=0) + + def reverse(self): + return self._spawn( + data=audioop.reverse(self._data, self.sample_width) + ) + + def _repr_html_(self): + src = """ + + """ + fh = self.export() + data = base64.b64encode(fh.read()).decode('ascii') + return src.format(base64=data) + + +from . import effects diff --git a/sbapp/pydub/effects.py b/sbapp/pydub/effects.py new file mode 100644 index 0000000..0210521 --- /dev/null +++ b/sbapp/pydub/effects.py @@ -0,0 +1,341 @@ +import sys +import math +import array +from .utils import ( + db_to_float, + ratio_to_db, + register_pydub_effect, + make_chunks, + audioop, + get_min_max_value +) +from .silence import split_on_silence +from .exceptions import TooManyMissingFrames, InvalidDuration + +if sys.version_info >= (3, 0): + xrange = range + + +@register_pydub_effect +def apply_mono_filter_to_each_channel(seg, filter_fn): + n_channels = seg.channels + + channel_segs = seg.split_to_mono() + channel_segs = [filter_fn(channel_seg) for channel_seg in channel_segs] + + out_data = seg.get_array_of_samples() + for channel_i, channel_seg in enumerate(channel_segs): + for sample_i, sample in enumerate(channel_seg.get_array_of_samples()): + index = (sample_i * n_channels) + channel_i + out_data[index] = sample + + return seg._spawn(out_data) + + +@register_pydub_effect +def normalize(seg, headroom=0.1): + """ + headroom is how close to the maximum volume to boost the signal up to (specified in dB) + """ + peak_sample_val = seg.max + + # if the max is 0, this audio segment is silent, and can't be normalized + if peak_sample_val == 0: + return seg + + target_peak = seg.max_possible_amplitude * db_to_float(-headroom) + + needed_boost = ratio_to_db(target_peak / peak_sample_val) + return seg.apply_gain(needed_boost) + + +@register_pydub_effect +def speedup(seg, playback_speed=1.5, chunk_size=150, crossfade=25): + # we will keep audio in 150ms chunks since one waveform at 20Hz is 50ms long + # (20 Hz is the lowest frequency audible to humans) + + # portion of AUDIO TO KEEP. if playback speed is 1.25 we keep 80% (0.8) and + # discard 20% (0.2) + atk = 1.0 / playback_speed + + if playback_speed < 2.0: + # throwing out more than half the audio - keep 50ms chunks + ms_to_remove_per_chunk = int(chunk_size * (1 - atk) / atk) + else: + # throwing out less than half the audio - throw out 50ms chunks + ms_to_remove_per_chunk = int(chunk_size) + chunk_size = int(atk * chunk_size / (1 - atk)) + + # the crossfade cannot be longer than the amount of audio we're removing + crossfade = min(crossfade, ms_to_remove_per_chunk - 1) + + # DEBUG + #print("chunk: {0}, rm: {1}".format(chunk_size, ms_to_remove_per_chunk)) + + chunks = make_chunks(seg, chunk_size + ms_to_remove_per_chunk) + if len(chunks) < 2: + raise Exception("Could not speed up AudioSegment, it was too short {2:0.2f}s for the current settings:\n{0}ms chunks at {1:0.1f}x speedup".format( + chunk_size, playback_speed, seg.duration_seconds)) + + # we'll actually truncate a bit less than we calculated to make up for the + # crossfade between chunks + ms_to_remove_per_chunk -= crossfade + + # we don't want to truncate the last chunk since it is not guaranteed to be + # the full chunk length + last_chunk = chunks[-1] + chunks = [chunk[:-ms_to_remove_per_chunk] for chunk in chunks[:-1]] + + out = chunks[0] + for chunk in chunks[1:]: + out = out.append(chunk, crossfade=crossfade) + + out += last_chunk + return out + + +@register_pydub_effect +def strip_silence(seg, silence_len=1000, silence_thresh=-16, padding=100): + if padding > silence_len: + raise InvalidDuration("padding cannot be longer than silence_len") + + chunks = split_on_silence(seg, silence_len, silence_thresh, padding) + crossfade = padding / 2 + + if not len(chunks): + return seg[0:0] + + seg = chunks[0] + for chunk in chunks[1:]: + seg = seg.append(chunk, crossfade=crossfade) + + return seg + + +@register_pydub_effect +def compress_dynamic_range(seg, threshold=-20.0, ratio=4.0, attack=5.0, release=50.0): + """ + Keyword Arguments: + + threshold - default: -20.0 + Threshold in dBFS. default of -20.0 means -20dB relative to the + maximum possible volume. 0dBFS is the maximum possible value so + all values for this argument sould be negative. + + ratio - default: 4.0 + Compression ratio. Audio louder than the threshold will be + reduced to 1/ratio the volume. A ratio of 4.0 is equivalent to + a setting of 4:1 in a pro-audio compressor like the Waves C1. + + attack - default: 5.0 + Attack in milliseconds. How long it should take for the compressor + to kick in once the audio has exceeded the threshold. + + release - default: 50.0 + Release in milliseconds. How long it should take for the compressor + to stop compressing after the audio has falled below the threshold. + + + For an overview of Dynamic Range Compression, and more detailed explanation + of the related terminology, see: + + http://en.wikipedia.org/wiki/Dynamic_range_compression + """ + + thresh_rms = seg.max_possible_amplitude * db_to_float(threshold) + + look_frames = int(seg.frame_count(ms=attack)) + def rms_at(frame_i): + return seg.get_sample_slice(frame_i - look_frames, frame_i).rms + def db_over_threshold(rms): + if rms == 0: return 0.0 + db = ratio_to_db(rms / thresh_rms) + return max(db, 0) + + output = [] + + # amount to reduce the volume of the audio by (in dB) + attenuation = 0.0 + + attack_frames = seg.frame_count(ms=attack) + release_frames = seg.frame_count(ms=release) + for i in xrange(int(seg.frame_count())): + rms_now = rms_at(i) + + # with a ratio of 4.0 this means the volume will exceed the threshold by + # 1/4 the amount (of dB) that it would otherwise + max_attenuation = (1 - (1.0 / ratio)) * db_over_threshold(rms_now) + + attenuation_inc = max_attenuation / attack_frames + attenuation_dec = max_attenuation / release_frames + + if rms_now > thresh_rms and attenuation <= max_attenuation: + attenuation += attenuation_inc + attenuation = min(attenuation, max_attenuation) + else: + attenuation -= attenuation_dec + attenuation = max(attenuation, 0) + + frame = seg.get_frame(i) + if attenuation != 0.0: + frame = audioop.mul(frame, + seg.sample_width, + db_to_float(-attenuation)) + + output.append(frame) + + return seg._spawn(data=b''.join(output)) + + +# Invert the phase of the signal. + +@register_pydub_effect + +def invert_phase(seg, channels=(1, 1)): + """ + channels- specifies which channel (left or right) to reverse the phase of. + Note that mono AudioSegments will become stereo. + """ + if channels == (1, 1): + inverted = audioop.mul(seg._data, seg.sample_width, -1.0) + return seg._spawn(data=inverted) + + else: + if seg.channels == 2: + left, right = seg.split_to_mono() + else: + raise Exception("Can't implicitly convert an AudioSegment with " + str(seg.channels) + " channels to stereo.") + + if channels == (1, 0): + left = left.invert_phase() + else: + right = right.invert_phase() + + return seg.from_mono_audiosegments(left, right) + + + +# High and low pass filters based on implementation found on Stack Overflow: +# http://stackoverflow.com/questions/13882038/implementing-simple-high-and-low-pass-filters-in-c + +@register_pydub_effect +def low_pass_filter(seg, cutoff): + """ + cutoff - Frequency (in Hz) where higher frequency signal will begin to + be reduced by 6dB per octave (doubling in frequency) above this point + """ + RC = 1.0 / (cutoff * 2 * math.pi) + dt = 1.0 / seg.frame_rate + + alpha = dt / (RC + dt) + + original = seg.get_array_of_samples() + filteredArray = array.array(seg.array_type, original) + + frame_count = int(seg.frame_count()) + + last_val = [0] * seg.channels + for i in range(seg.channels): + last_val[i] = filteredArray[i] = original[i] + + for i in range(1, frame_count): + for j in range(seg.channels): + offset = (i * seg.channels) + j + last_val[j] = last_val[j] + (alpha * (original[offset] - last_val[j])) + filteredArray[offset] = int(last_val[j]) + + return seg._spawn(data=filteredArray) + + +@register_pydub_effect +def high_pass_filter(seg, cutoff): + """ + cutoff - Frequency (in Hz) where lower frequency signal will begin to + be reduced by 6dB per octave (doubling in frequency) below this point + """ + RC = 1.0 / (cutoff * 2 * math.pi) + dt = 1.0 / seg.frame_rate + + alpha = RC / (RC + dt) + + minval, maxval = get_min_max_value(seg.sample_width * 8) + + original = seg.get_array_of_samples() + filteredArray = array.array(seg.array_type, original) + + frame_count = int(seg.frame_count()) + + last_val = [0] * seg.channels + for i in range(seg.channels): + last_val[i] = filteredArray[i] = original[i] + + for i in range(1, frame_count): + for j in range(seg.channels): + offset = (i * seg.channels) + j + offset_minus_1 = ((i-1) * seg.channels) + j + + last_val[j] = alpha * (last_val[j] + original[offset] - original[offset_minus_1]) + filteredArray[offset] = int(min(max(last_val[j], minval), maxval)) + + return seg._spawn(data=filteredArray) + + +@register_pydub_effect +def pan(seg, pan_amount): + """ + pan_amount should be between -1.0 (100% left) and +1.0 (100% right) + + When pan_amount == 0.0 the left/right balance is not changed. + + Panning does not alter the *perceived* loundness, but since loudness + is decreasing on one side, the other side needs to get louder to + compensate. When panned hard left, the left channel will be 3dB louder. + """ + if not -1.0 <= pan_amount <= 1.0: + raise ValueError("pan_amount should be between -1.0 (100% left) and +1.0 (100% right)") + + max_boost_db = ratio_to_db(2.0) + boost_db = abs(pan_amount) * max_boost_db + + boost_factor = db_to_float(boost_db) + reduce_factor = db_to_float(max_boost_db) - boost_factor + + reduce_db = ratio_to_db(reduce_factor) + + # Cut boost in half (max boost== 3dB) - in reality 2 speakers + # do not sum to a full 6 dB. + boost_db = boost_db / 2.0 + + if pan_amount < 0: + return seg.apply_gain_stereo(boost_db, reduce_db) + else: + return seg.apply_gain_stereo(reduce_db, boost_db) + + +@register_pydub_effect +def apply_gain_stereo(seg, left_gain=0.0, right_gain=0.0): + """ + left_gain - amount of gain to apply to the left channel (in dB) + right_gain - amount of gain to apply to the right channel (in dB) + + note: mono audio segments will be converted to stereo + """ + if seg.channels == 1: + left = right = seg + elif seg.channels == 2: + left, right = seg.split_to_mono() + + l_mult_factor = db_to_float(left_gain) + r_mult_factor = db_to_float(right_gain) + + left_data = audioop.mul(left._data, left.sample_width, l_mult_factor) + left_data = audioop.tostereo(left_data, left.sample_width, 1, 0) + + right_data = audioop.mul(right._data, right.sample_width, r_mult_factor) + right_data = audioop.tostereo(right_data, right.sample_width, 0, 1) + + output = audioop.add(left_data, right_data, seg.sample_width) + + return seg._spawn(data=output, + overrides={'channels': 2, + 'frame_width': 2 * seg.sample_width}) diff --git a/sbapp/pydub/exceptions.py b/sbapp/pydub/exceptions.py new file mode 100644 index 0000000..79d0743 --- /dev/null +++ b/sbapp/pydub/exceptions.py @@ -0,0 +1,32 @@ +class PydubException(Exception): + """ + Base class for any Pydub exception + """ + + +class TooManyMissingFrames(PydubException): + pass + + +class InvalidDuration(PydubException): + pass + + +class InvalidTag(PydubException): + pass + + +class InvalidID3TagVersion(PydubException): + pass + + +class CouldntDecodeError(PydubException): + pass + + +class CouldntEncodeError(PydubException): + pass + + +class MissingAudioParameter(PydubException): + pass diff --git a/sbapp/pydub/generators.py b/sbapp/pydub/generators.py new file mode 100644 index 0000000..b04cb4c --- /dev/null +++ b/sbapp/pydub/generators.py @@ -0,0 +1,142 @@ +""" +Each generator will return float samples from -1.0 to 1.0, which can be +converted to actual audio with 8, 16, 24, or 32 bit depth using the +SiganlGenerator.to_audio_segment() method (on any of it's subclasses). + +See Wikipedia's "waveform" page for info on some of the generators included +here: http://en.wikipedia.org/wiki/Waveform +""" + +import math +import array +import itertools +import random +from .audio_segment import AudioSegment +from .utils import ( + db_to_float, + get_frame_width, + get_array_type, + get_min_max_value +) + + + +class SignalGenerator(object): + def __init__(self, sample_rate=44100, bit_depth=16): + self.sample_rate = sample_rate + self.bit_depth = bit_depth + + def to_audio_segment(self, duration=1000.0, volume=0.0): + """ + Duration in milliseconds + (default: 1 second) + Volume in DB relative to maximum amplitude + (default 0.0 dBFS, which is the maximum value) + """ + minval, maxval = get_min_max_value(self.bit_depth) + sample_width = get_frame_width(self.bit_depth) + array_type = get_array_type(self.bit_depth) + + gain = db_to_float(volume) + sample_count = int(self.sample_rate * (duration / 1000.0)) + + sample_data = (int(val * maxval * gain) for val in self.generate()) + sample_data = itertools.islice(sample_data, 0, sample_count) + + data = array.array(array_type, sample_data) + + try: + data = data.tobytes() + except: + data = data.tostring() + + return AudioSegment(data=data, metadata={ + "channels": 1, + "sample_width": sample_width, + "frame_rate": self.sample_rate, + "frame_width": sample_width, + }) + + def generate(self): + raise NotImplementedError("SignalGenerator subclasses must implement the generate() method, and *should not* call the superclass implementation.") + + + +class Sine(SignalGenerator): + def __init__(self, freq, **kwargs): + super(Sine, self).__init__(**kwargs) + self.freq = freq + + def generate(self): + sine_of = (self.freq * 2 * math.pi) / self.sample_rate + sample_n = 0 + while True: + yield math.sin(sine_of * sample_n) + sample_n += 1 + + + +class Pulse(SignalGenerator): + def __init__(self, freq, duty_cycle=0.5, **kwargs): + super(Pulse, self).__init__(**kwargs) + self.freq = freq + self.duty_cycle = duty_cycle + + def generate(self): + sample_n = 0 + + # in samples + cycle_length = self.sample_rate / float(self.freq) + pulse_length = cycle_length * self.duty_cycle + + while True: + if (sample_n % cycle_length) < pulse_length: + yield 1.0 + else: + yield -1.0 + sample_n += 1 + + + +class Square(Pulse): + def __init__(self, freq, **kwargs): + kwargs['duty_cycle'] = 0.5 + super(Square, self).__init__(freq, **kwargs) + + + +class Sawtooth(SignalGenerator): + def __init__(self, freq, duty_cycle=1.0, **kwargs): + super(Sawtooth, self).__init__(**kwargs) + self.freq = freq + self.duty_cycle = duty_cycle + + def generate(self): + sample_n = 0 + + # in samples + cycle_length = self.sample_rate / float(self.freq) + midpoint = cycle_length * self.duty_cycle + ascend_length = midpoint + descend_length = cycle_length - ascend_length + + while True: + cycle_position = sample_n % cycle_length + if cycle_position < midpoint: + yield (2 * cycle_position / ascend_length) - 1.0 + else: + yield 1.0 - (2 * (cycle_position - midpoint) / descend_length) + sample_n += 1 + + + +class Triangle(Sawtooth): + def __init__(self, freq, **kwargs): + kwargs['duty_cycle'] = 0.5 + super(Triangle, self).__init__(freq, **kwargs) + + +class WhiteNoise(SignalGenerator): + def generate(self): + while True: + yield (random.random() * 2) - 1.0 diff --git a/sbapp/pydub/logging_utils.py b/sbapp/pydub/logging_utils.py new file mode 100644 index 0000000..a312bd2 --- /dev/null +++ b/sbapp/pydub/logging_utils.py @@ -0,0 +1,14 @@ +""" + +""" +import logging + +converter_logger = logging.getLogger("pydub.converter") + +def log_conversion(conversion_command): + converter_logger.debug("subprocess.call(%s)", repr(conversion_command)) + +def log_subprocess_output(output): + if output: + for line in output.rstrip().splitlines(): + converter_logger.debug('subprocess output: %s', line.rstrip()) diff --git a/sbapp/pydub/playback.py b/sbapp/pydub/playback.py new file mode 100644 index 0000000..72ce4a5 --- /dev/null +++ b/sbapp/pydub/playback.py @@ -0,0 +1,71 @@ +""" +Support for playing AudioSegments. Pyaudio will be used if it's installed, +otherwise will fallback to ffplay. Pyaudio is a *much* nicer solution, but +is tricky to install. See my notes on installing pyaudio in a virtualenv (on +OSX 10.10): https://gist.github.com/jiaaro/9767512210a1d80a8a0d +""" + +import subprocess +from tempfile import NamedTemporaryFile +from .utils import get_player_name, make_chunks + +def _play_with_ffplay(seg): + PLAYER = get_player_name() + with NamedTemporaryFile("w+b", suffix=".wav") as f: + seg.export(f.name, "wav") + subprocess.call([PLAYER, "-nodisp", "-autoexit", "-hide_banner", f.name]) + + +def _play_with_pyaudio(seg): + import pyaudio + + p = pyaudio.PyAudio() + stream = p.open(format=p.get_format_from_width(seg.sample_width), + channels=seg.channels, + rate=seg.frame_rate, + output=True) + + # Just in case there were any exceptions/interrupts, we release the resource + # So as not to raise OSError: Device Unavailable should play() be used again + try: + # break audio into half-second chunks (to allows keyboard interrupts) + for chunk in make_chunks(seg, 500): + stream.write(chunk._data) + finally: + stream.stop_stream() + stream.close() + + p.terminate() + + +def _play_with_simpleaudio(seg): + import simpleaudio + return simpleaudio.play_buffer( + seg.raw_data, + num_channels=seg.channels, + bytes_per_sample=seg.sample_width, + sample_rate=seg.frame_rate + ) + + +def play(audio_segment): + try: + playback = _play_with_simpleaudio(audio_segment) + try: + playback.wait_done() + except KeyboardInterrupt: + playback.stop() + except ImportError: + pass + else: + return + + try: + _play_with_pyaudio(audio_segment) + return + except ImportError: + pass + else: + return + + _play_with_ffplay(audio_segment) diff --git a/sbapp/pydub/pyaudioop.py b/sbapp/pydub/pyaudioop.py new file mode 100644 index 0000000..9b1e2fb --- /dev/null +++ b/sbapp/pydub/pyaudioop.py @@ -0,0 +1,553 @@ +try: + from __builtin__ import max as builtin_max + from __builtin__ import min as builtin_min +except ImportError: + from builtins import max as builtin_max + from builtins import min as builtin_min +import math +import struct +try: + from fractions import gcd +except ImportError: # Python 3.9+ + from math import gcd +from ctypes import create_string_buffer + + +class error(Exception): + pass + + +def _check_size(size): + if size != 1 and size != 2 and size != 4: + raise error("Size should be 1, 2 or 4") + + +def _check_params(length, size): + _check_size(size) + if length % size != 0: + raise error("not a whole number of frames") + + +def _sample_count(cp, size): + return len(cp) / size + + +def _get_samples(cp, size, signed=True): + for i in range(_sample_count(cp, size)): + yield _get_sample(cp, size, i, signed) + + +def _struct_format(size, signed): + if size == 1: + return "b" if signed else "B" + elif size == 2: + return "h" if signed else "H" + elif size == 4: + return "i" if signed else "I" + + +def _get_sample(cp, size, i, signed=True): + fmt = _struct_format(size, signed) + start = i * size + end = start + size + return struct.unpack_from(fmt, buffer(cp)[start:end])[0] + + +def _put_sample(cp, size, i, val, signed=True): + fmt = _struct_format(size, signed) + struct.pack_into(fmt, cp, i * size, val) + + +def _get_maxval(size, signed=True): + if signed and size == 1: + return 0x7f + elif size == 1: + return 0xff + elif signed and size == 2: + return 0x7fff + elif size == 2: + return 0xffff + elif signed and size == 4: + return 0x7fffffff + elif size == 4: + return 0xffffffff + + +def _get_minval(size, signed=True): + if not signed: + return 0 + elif size == 1: + return -0x80 + elif size == 2: + return -0x8000 + elif size == 4: + return -0x80000000 + + +def _get_clipfn(size, signed=True): + maxval = _get_maxval(size, signed) + minval = _get_minval(size, signed) + return lambda val: builtin_max(min(val, maxval), minval) + + +def _overflow(val, size, signed=True): + minval = _get_minval(size, signed) + maxval = _get_maxval(size, signed) + if minval <= val <= maxval: + return val + + bits = size * 8 + if signed: + offset = 2**(bits-1) + return ((val + offset) % (2**bits)) - offset + else: + return val % (2**bits) + + +def getsample(cp, size, i): + _check_params(len(cp), size) + if not (0 <= i < len(cp) / size): + raise error("Index out of range") + return _get_sample(cp, size, i) + + +def max(cp, size): + _check_params(len(cp), size) + + if len(cp) == 0: + return 0 + + return builtin_max(abs(sample) for sample in _get_samples(cp, size)) + + +def minmax(cp, size): + _check_params(len(cp), size) + + max_sample, min_sample = 0, 0 + for sample in _get_samples(cp, size): + max_sample = builtin_max(sample, max_sample) + min_sample = builtin_min(sample, min_sample) + + return min_sample, max_sample + + +def avg(cp, size): + _check_params(len(cp), size) + sample_count = _sample_count(cp, size) + if sample_count == 0: + return 0 + return sum(_get_samples(cp, size)) / sample_count + + +def rms(cp, size): + _check_params(len(cp), size) + + sample_count = _sample_count(cp, size) + if sample_count == 0: + return 0 + + sum_squares = sum(sample**2 for sample in _get_samples(cp, size)) + return int(math.sqrt(sum_squares / sample_count)) + + +def _sum2(cp1, cp2, length): + size = 2 + total = 0 + for i in range(length): + total += getsample(cp1, size, i) * getsample(cp2, size, i) + return total + + +def findfit(cp1, cp2): + size = 2 + + if len(cp1) % 2 != 0 or len(cp2) % 2 != 0: + raise error("Strings should be even-sized") + + if len(cp1) < len(cp2): + raise error("First sample should be longer") + + len1 = _sample_count(cp1, size) + len2 = _sample_count(cp2, size) + + sum_ri_2 = _sum2(cp2, cp2, len2) + sum_aij_2 = _sum2(cp1, cp1, len2) + sum_aij_ri = _sum2(cp1, cp2, len2) + + result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2 + + best_result = result + best_i = 0 + + for i in range(1, len1 - len2 + 1): + aj_m1 = _get_sample(cp1, size, i - 1) + aj_lm1 = _get_sample(cp1, size, i + len2 - 1) + + sum_aij_2 += aj_lm1**2 - aj_m1**2 + sum_aij_ri = _sum2(buffer(cp1)[i*size:], cp2, len2) + + result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2 + + if result < best_result: + best_result = result + best_i = i + + factor = _sum2(buffer(cp1)[best_i*size:], cp2, len2) / sum_ri_2 + + return best_i, factor + + +def findfactor(cp1, cp2): + size = 2 + + if len(cp1) % 2 != 0: + raise error("Strings should be even-sized") + + if len(cp1) != len(cp2): + raise error("Samples should be same size") + + sample_count = _sample_count(cp1, size) + + sum_ri_2 = _sum2(cp2, cp2, sample_count) + sum_aij_ri = _sum2(cp1, cp2, sample_count) + + return sum_aij_ri / sum_ri_2 + + +def findmax(cp, len2): + size = 2 + sample_count = _sample_count(cp, size) + + if len(cp) % 2 != 0: + raise error("Strings should be even-sized") + + if len2 < 0 or sample_count < len2: + raise error("Input sample should be longer") + + if sample_count == 0: + return 0 + + result = _sum2(cp, cp, len2) + best_result = result + best_i = 0 + + for i in range(1, sample_count - len2 + 1): + sample_leaving_window = getsample(cp, size, i - 1) + sample_entering_window = getsample(cp, size, i + len2 - 1) + + result -= sample_leaving_window**2 + result += sample_entering_window**2 + + if result > best_result: + best_result = result + best_i = i + + return best_i + + +def avgpp(cp, size): + _check_params(len(cp), size) + sample_count = _sample_count(cp, size) + + prevextremevalid = False + prevextreme = None + avg = 0 + nextreme = 0 + + prevval = getsample(cp, size, 0) + val = getsample(cp, size, 1) + + prevdiff = val - prevval + + for i in range(1, sample_count): + val = getsample(cp, size, i) + diff = val - prevval + + if diff * prevdiff < 0: + if prevextremevalid: + avg += abs(prevval - prevextreme) + nextreme += 1 + + prevextremevalid = True + prevextreme = prevval + + prevval = val + if diff != 0: + prevdiff = diff + + if nextreme == 0: + return 0 + + return avg / nextreme + + +def maxpp(cp, size): + _check_params(len(cp), size) + sample_count = _sample_count(cp, size) + + prevextremevalid = False + prevextreme = None + max = 0 + + prevval = getsample(cp, size, 0) + val = getsample(cp, size, 1) + + prevdiff = val - prevval + + for i in range(1, sample_count): + val = getsample(cp, size, i) + diff = val - prevval + + if diff * prevdiff < 0: + if prevextremevalid: + extremediff = abs(prevval - prevextreme) + if extremediff > max: + max = extremediff + prevextremevalid = True + prevextreme = prevval + + prevval = val + if diff != 0: + prevdiff = diff + + return max + + +def cross(cp, size): + _check_params(len(cp), size) + + crossings = 0 + last_sample = 0 + for sample in _get_samples(cp, size): + if sample <= 0 < last_sample or sample >= 0 > last_sample: + crossings += 1 + last_sample = sample + + return crossings + + +def mul(cp, size, factor): + _check_params(len(cp), size) + clip = _get_clipfn(size) + + result = create_string_buffer(len(cp)) + + for i, sample in enumerate(_get_samples(cp, size)): + sample = clip(int(sample * factor)) + _put_sample(result, size, i, sample) + + return result.raw + + +def tomono(cp, size, fac1, fac2): + _check_params(len(cp), size) + clip = _get_clipfn(size) + + sample_count = _sample_count(cp, size) + + result = create_string_buffer(len(cp) / 2) + + for i in range(0, sample_count, 2): + l_sample = getsample(cp, size, i) + r_sample = getsample(cp, size, i + 1) + + sample = (l_sample * fac1) + (r_sample * fac2) + sample = clip(sample) + + _put_sample(result, size, i / 2, sample) + + return result.raw + + +def tostereo(cp, size, fac1, fac2): + _check_params(len(cp), size) + + sample_count = _sample_count(cp, size) + + result = create_string_buffer(len(cp) * 2) + clip = _get_clipfn(size) + + for i in range(sample_count): + sample = _get_sample(cp, size, i) + + l_sample = clip(sample * fac1) + r_sample = clip(sample * fac2) + + _put_sample(result, size, i * 2, l_sample) + _put_sample(result, size, i * 2 + 1, r_sample) + + return result.raw + + +def add(cp1, cp2, size): + _check_params(len(cp1), size) + + if len(cp1) != len(cp2): + raise error("Lengths should be the same") + + clip = _get_clipfn(size) + sample_count = _sample_count(cp1, size) + result = create_string_buffer(len(cp1)) + + for i in range(sample_count): + sample1 = getsample(cp1, size, i) + sample2 = getsample(cp2, size, i) + + sample = clip(sample1 + sample2) + + _put_sample(result, size, i, sample) + + return result.raw + + +def bias(cp, size, bias): + _check_params(len(cp), size) + + result = create_string_buffer(len(cp)) + + for i, sample in enumerate(_get_samples(cp, size)): + sample = _overflow(sample + bias, size) + _put_sample(result, size, i, sample) + + return result.raw + + +def reverse(cp, size): + _check_params(len(cp), size) + sample_count = _sample_count(cp, size) + + result = create_string_buffer(len(cp)) + for i, sample in enumerate(_get_samples(cp, size)): + _put_sample(result, size, sample_count - i - 1, sample) + + return result.raw + + +def lin2lin(cp, size, size2): + _check_params(len(cp), size) + _check_size(size2) + + if size == size2: + return cp + + new_len = (len(cp) / size) * size2 + + result = create_string_buffer(new_len) + + for i in range(_sample_count(cp, size)): + sample = _get_sample(cp, size, i) + if size < size2: + sample = sample << (4 * size2 / size) + elif size > size2: + sample = sample >> (4 * size / size2) + + sample = _overflow(sample, size2) + + _put_sample(result, size2, i, sample) + + return result.raw + + +def ratecv(cp, size, nchannels, inrate, outrate, state, weightA=1, weightB=0): + _check_params(len(cp), size) + if nchannels < 1: + raise error("# of channels should be >= 1") + + bytes_per_frame = size * nchannels + frame_count = len(cp) / bytes_per_frame + + if bytes_per_frame / nchannels != size: + raise OverflowError("width * nchannels too big for a C int") + + if weightA < 1 or weightB < 0: + raise error("weightA should be >= 1, weightB should be >= 0") + + if len(cp) % bytes_per_frame != 0: + raise error("not a whole number of frames") + + if inrate <= 0 or outrate <= 0: + raise error("sampling rate not > 0") + + d = gcd(inrate, outrate) + inrate /= d + outrate /= d + + prev_i = [0] * nchannels + cur_i = [0] * nchannels + + if state is None: + d = -outrate + else: + d, samps = state + + if len(samps) != nchannels: + raise error("illegal state argument") + + prev_i, cur_i = zip(*samps) + prev_i, cur_i = list(prev_i), list(cur_i) + + q = frame_count / inrate + ceiling = (q + 1) * outrate + nbytes = ceiling * bytes_per_frame + + result = create_string_buffer(nbytes) + + samples = _get_samples(cp, size) + out_i = 0 + while True: + while d < 0: + if frame_count == 0: + samps = zip(prev_i, cur_i) + retval = result.raw + + # slice off extra bytes + trim_index = (out_i * bytes_per_frame) - len(retval) + retval = buffer(retval)[:trim_index] + + return (retval, (d, tuple(samps))) + + for chan in range(nchannels): + prev_i[chan] = cur_i[chan] + cur_i[chan] = samples.next() + + cur_i[chan] = ( + (weightA * cur_i[chan] + weightB * prev_i[chan]) + / (weightA + weightB) + ) + + frame_count -= 1 + d += outrate + + while d >= 0: + for chan in range(nchannels): + cur_o = ( + (prev_i[chan] * d + cur_i[chan] * (outrate - d)) + / outrate + ) + _put_sample(result, size, out_i, _overflow(cur_o, size)) + out_i += 1 + d -= inrate + + +def lin2ulaw(cp, size): + raise NotImplementedError() + + +def ulaw2lin(cp, size): + raise NotImplementedError() + + +def lin2alaw(cp, size): + raise NotImplementedError() + + +def alaw2lin(cp, size): + raise NotImplementedError() + + +def lin2adpcm(cp, size, state): + raise NotImplementedError() + + +def adpcm2lin(cp, size, state): + raise NotImplementedError() diff --git a/sbapp/pydub/scipy_effects.py b/sbapp/pydub/scipy_effects.py new file mode 100644 index 0000000..abab2b4 --- /dev/null +++ b/sbapp/pydub/scipy_effects.py @@ -0,0 +1,175 @@ +""" +This module provides scipy versions of high_pass_filter, and low_pass_filter +as well as an additional band_pass_filter. + +Of course, you will need to install scipy for these to work. + +When this module is imported the high and low pass filters from this module +will be used when calling audio_segment.high_pass_filter() and +audio_segment.high_pass_filter() instead of the slower, less powerful versions +provided by pydub.effects. +""" +from scipy.signal import butter, sosfilt +from .utils import (register_pydub_effect,stereo_to_ms,ms_to_stereo) + + +def _mk_butter_filter(freq, type, order): + """ + Args: + freq: The cutoff frequency for highpass and lowpass filters. For + band filters, a list of [low_cutoff, high_cutoff] + type: "lowpass", "highpass", or "band" + order: nth order butterworth filter (default: 5th order). The + attenuation is -6dB/octave beyond the cutoff frequency (for 1st + order). A Higher order filter will have more attenuation, each level + adding an additional -6dB (so a 3rd order butterworth filter would + be -18dB/octave). + + Returns: + function which can filter a mono audio segment + + """ + def filter_fn(seg): + assert seg.channels == 1 + + nyq = 0.5 * seg.frame_rate + try: + freqs = [f / nyq for f in freq] + except TypeError: + freqs = freq / nyq + + sos = butter(order, freqs, btype=type, output='sos') + y = sosfilt(sos, seg.get_array_of_samples()) + + return seg._spawn(y.astype(seg.array_type)) + + return filter_fn + + +@register_pydub_effect +def band_pass_filter(seg, low_cutoff_freq, high_cutoff_freq, order=5): + filter_fn = _mk_butter_filter([low_cutoff_freq, high_cutoff_freq], 'band', order=order) + return seg.apply_mono_filter_to_each_channel(filter_fn) + + +@register_pydub_effect +def high_pass_filter(seg, cutoff_freq, order=5): + filter_fn = _mk_butter_filter(cutoff_freq, 'highpass', order=order) + return seg.apply_mono_filter_to_each_channel(filter_fn) + + +@register_pydub_effect +def low_pass_filter(seg, cutoff_freq, order=5): + filter_fn = _mk_butter_filter(cutoff_freq, 'lowpass', order=order) + return seg.apply_mono_filter_to_each_channel(filter_fn) + + +@register_pydub_effect +def _eq(seg, focus_freq, bandwidth=100, mode="peak", gain_dB=0, order=2): + """ + Args: + focus_freq - middle frequency or known frequency of band (in Hz) + bandwidth - range of the equalizer band + mode - Mode of Equalization(Peak/Notch(Bell Curve),High Shelf, Low Shelf) + order - Rolloff factor(1 - 6dB/Octave 2 - 12dB/Octave) + + Returns: + Equalized/Filtered AudioSegment + """ + filt_mode = ["peak", "low_shelf", "high_shelf"] + if mode not in filt_mode: + raise ValueError("Incorrect Mode Selection") + + if gain_dB >= 0: + if mode == "peak": + sec = band_pass_filter(seg, focus_freq - bandwidth/2, focus_freq + bandwidth/2, order = order) + seg = seg.overlay(sec - (3 - gain_dB)) + return seg + + if mode == "low_shelf": + sec = low_pass_filter(seg, focus_freq, order=order) + seg = seg.overlay(sec - (3 - gain_dB)) + return seg + + if mode == "high_shelf": + sec = high_pass_filter(seg, focus_freq, order=order) + seg = seg.overlay(sec - (3 - gain_dB)) + return seg + + if gain_dB < 0: + if mode == "peak": + sec = high_pass_filter(seg, focus_freq - bandwidth/2, order=order) + seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB + sec = low_pass_filter(seg, focus_freq + bandwidth/2, order=order) + seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB + return seg + + if mode == "low_shelf": + sec = high_pass_filter(seg, focus_freq, order=order) + seg = seg.overlay(sec - (3 + gain_dB)) + gain_dB + return seg + + if mode=="high_shelf": + sec=low_pass_filter(seg, focus_freq, order=order) + seg=seg.overlay(sec - (3 + gain_dB)) +gain_dB + return seg + + +@register_pydub_effect +def eq(seg, focus_freq, bandwidth=100, channel_mode="L+R", filter_mode="peak", gain_dB=0, order=2): + """ + Args: + focus_freq - middle frequency or known frequency of band (in Hz) + bandwidth - range of the equalizer band + channel_mode - Select Channels to be affected by the filter. + L+R - Standard Stereo Filter + L - Only Left Channel is Filtered + R - Only Right Channel is Filtered + M+S - Blumlien Stereo Filter(Mid-Side) + M - Only Mid Channel is Filtered + S - Only Side Channel is Filtered + Mono Audio Segments are completely filtered. + filter_mode - Mode of Equalization(Peak/Notch(Bell Curve),High Shelf, Low Shelf) + order - Rolloff factor(1 - 6dB/Octave 2 - 12dB/Octave) + + Returns: + Equalized/Filtered AudioSegment + """ + channel_modes = ["L+R", "M+S", "L", "R", "M", "S"] + if channel_mode not in channel_modes: + raise ValueError("Incorrect Channel Mode Selection") + + if seg.channels == 1: + return _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order) + + if channel_mode == "L+R": + return _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order) + + if channel_mode == "L": + seg = seg.split_to_mono() + seg = [_eq(seg[0], focus_freq, bandwidth, filter_mode, gain_dB, order), seg[1]] + return AudioSegment.from_mono_audio_segements(seg[0], seg[1]) + + if channel_mode == "R": + seg = seg.split_to_mono() + seg = [seg[0], _eq(seg[1], focus_freq, bandwidth, filter_mode, gain_dB, order)] + return AudioSegment.from_mono_audio_segements(seg[0], seg[1]) + + if channel_mode == "M+S": + seg = stereo_to_ms(seg) + seg = _eq(seg, focus_freq, bandwidth, filter_mode, gain_dB, order) + return ms_to_stereo(seg) + + if channel_mode == "M": + seg = stereo_to_ms(seg).split_to_mono() + seg = [_eq(seg[0], focus_freq, bandwidth, filter_mode, gain_dB, order), seg[1]] + seg = AudioSegment.from_mono_audio_segements(seg[0], seg[1]) + return ms_to_stereo(seg) + + if channel_mode == "S": + seg = stereo_to_ms(seg).split_to_mono() + seg = [seg[0], _eq(seg[1], focus_freq, bandwidth, filter_mode, gain_dB, order)] + seg = AudioSegment.from_mono_audio_segements(seg[0], seg[1]) + return ms_to_stereo(seg) + + diff --git a/sbapp/pydub/silence.py b/sbapp/pydub/silence.py new file mode 100644 index 0000000..0ad1499 --- /dev/null +++ b/sbapp/pydub/silence.py @@ -0,0 +1,182 @@ +""" +Various functions for finding/manipulating silence in AudioSegments +""" +import itertools + +from .utils import db_to_float + + +def detect_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1): + """ + Returns a list of all silent sections [start, end] in milliseconds of audio_segment. + Inverse of detect_nonsilent() + + audio_segment - the segment to find silence in + min_silence_len - the minimum length for any silent section + silence_thresh - the upper bound for how quiet is silent in dFBS + seek_step - step size for interating over the segment in ms + """ + seg_len = len(audio_segment) + + # you can't have a silent portion of a sound that is longer than the sound + if seg_len < min_silence_len: + return [] + + # convert silence threshold to a float value (so we can compare it to rms) + silence_thresh = db_to_float(silence_thresh) * audio_segment.max_possible_amplitude + + # find silence and add start and end indicies to the to_cut list + silence_starts = [] + + # check successive (1 sec by default) chunk of sound for silence + # try a chunk at every "seek step" (or every chunk for a seek step == 1) + last_slice_start = seg_len - min_silence_len + slice_starts = range(0, last_slice_start + 1, seek_step) + + # guarantee last_slice_start is included in the range + # to make sure the last portion of the audio is searched + if last_slice_start % seek_step: + slice_starts = itertools.chain(slice_starts, [last_slice_start]) + + for i in slice_starts: + audio_slice = audio_segment[i:i + min_silence_len] + if audio_slice.rms <= silence_thresh: + silence_starts.append(i) + + # short circuit when there is no silence + if not silence_starts: + return [] + + # combine the silence we detected into ranges (start ms - end ms) + silent_ranges = [] + + prev_i = silence_starts.pop(0) + current_range_start = prev_i + + for silence_start_i in silence_starts: + continuous = (silence_start_i == prev_i + seek_step) + + # sometimes two small blips are enough for one particular slice to be + # non-silent, despite the silence all running together. Just combine + # the two overlapping silent ranges. + silence_has_gap = silence_start_i > (prev_i + min_silence_len) + + if not continuous and silence_has_gap: + silent_ranges.append([current_range_start, + prev_i + min_silence_len]) + current_range_start = silence_start_i + prev_i = silence_start_i + + silent_ranges.append([current_range_start, + prev_i + min_silence_len]) + + return silent_ranges + + +def detect_nonsilent(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1): + """ + Returns a list of all nonsilent sections [start, end] in milliseconds of audio_segment. + Inverse of detect_silent() + + audio_segment - the segment to find silence in + min_silence_len - the minimum length for any silent section + silence_thresh - the upper bound for how quiet is silent in dFBS + seek_step - step size for interating over the segment in ms + """ + silent_ranges = detect_silence(audio_segment, min_silence_len, silence_thresh, seek_step) + len_seg = len(audio_segment) + + # if there is no silence, the whole thing is nonsilent + if not silent_ranges: + return [[0, len_seg]] + + # short circuit when the whole audio segment is silent + if silent_ranges[0][0] == 0 and silent_ranges[0][1] == len_seg: + return [] + + prev_end_i = 0 + nonsilent_ranges = [] + for start_i, end_i in silent_ranges: + nonsilent_ranges.append([prev_end_i, start_i]) + prev_end_i = end_i + + if end_i != len_seg: + nonsilent_ranges.append([prev_end_i, len_seg]) + + if nonsilent_ranges[0] == [0, 0]: + nonsilent_ranges.pop(0) + + return nonsilent_ranges + + +def split_on_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, keep_silence=100, + seek_step=1): + """ + Returns list of audio segments from splitting audio_segment on silent sections + + audio_segment - original pydub.AudioSegment() object + + min_silence_len - (in ms) minimum length of a silence to be used for + a split. default: 1000ms + + silence_thresh - (in dBFS) anything quieter than this will be + considered silence. default: -16dBFS + + keep_silence - (in ms or True/False) leave some silence at the beginning + and end of the chunks. Keeps the sound from sounding like it + is abruptly cut off. + When the length of the silence is less than the keep_silence duration + it is split evenly between the preceding and following non-silent + segments. + If True is specified, all the silence is kept, if False none is kept. + default: 100ms + + seek_step - step size for interating over the segment in ms + """ + + # from the itertools documentation + def pairwise(iterable): + "s -> (s0,s1), (s1,s2), (s2, s3), ..." + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b) + + if isinstance(keep_silence, bool): + keep_silence = len(audio_segment) if keep_silence else 0 + + output_ranges = [ + [ start - keep_silence, end + keep_silence ] + for (start,end) + in detect_nonsilent(audio_segment, min_silence_len, silence_thresh, seek_step) + ] + + for range_i, range_ii in pairwise(output_ranges): + last_end = range_i[1] + next_start = range_ii[0] + if next_start < last_end: + range_i[1] = (last_end+next_start)//2 + range_ii[0] = range_i[1] + + return [ + audio_segment[ max(start,0) : min(end,len(audio_segment)) ] + for start,end in output_ranges + ] + + +def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10): + """ + Returns the millisecond/index that the leading silence ends. + + audio_segment - the segment to find silence in + silence_threshold - the upper bound for how quiet is silent in dFBS + chunk_size - chunk size for interating over the segment in ms + """ + trim_ms = 0 # ms + assert chunk_size > 0 # to avoid infinite loop + while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound): + trim_ms += chunk_size + + # if there is no end it should return the length of the segment + return min(trim_ms, len(sound)) + + diff --git a/sbapp/pydub/utils.py b/sbapp/pydub/utils.py new file mode 100644 index 0000000..740c500 --- /dev/null +++ b/sbapp/pydub/utils.py @@ -0,0 +1,434 @@ +from __future__ import division + +import json +import os +import re +import sys +from subprocess import Popen, PIPE +from math import log, ceil +from tempfile import TemporaryFile +from warnings import warn +from functools import wraps + +try: + import audioop +except ImportError: + import pyaudioop as audioop + +if sys.version_info >= (3, 0): + basestring = str + +FRAME_WIDTHS = { + 8: 1, + 16: 2, + 32: 4, +} +ARRAY_TYPES = { + 8: "b", + 16: "h", + 32: "i", +} +ARRAY_RANGES = { + 8: (-0x80, 0x7f), + 16: (-0x8000, 0x7fff), + 32: (-0x80000000, 0x7fffffff), +} + + +def get_frame_width(bit_depth): + return FRAME_WIDTHS[bit_depth] + + +def get_array_type(bit_depth, signed=True): + t = ARRAY_TYPES[bit_depth] + if not signed: + t = t.upper() + return t + + +def get_min_max_value(bit_depth): + return ARRAY_RANGES[bit_depth] + + +def _fd_or_path_or_tempfile(fd, mode='w+b', tempfile=True): + close_fd = False + if fd is None and tempfile: + fd = TemporaryFile(mode=mode) + close_fd = True + + if isinstance(fd, basestring): + fd = open(fd, mode=mode) + close_fd = True + + try: + if isinstance(fd, os.PathLike): + fd = open(fd, mode=mode) + close_fd = True + except AttributeError: + # module os has no attribute PathLike, so we're on python < 3.6. + # The protocol we're trying to support doesn't exist, so just pass. + pass + + return fd, close_fd + + +def db_to_float(db, using_amplitude=True): + """ + Converts the input db to a float, which represents the equivalent + ratio in power. + """ + db = float(db) + if using_amplitude: + return 10 ** (db / 20) + else: # using power + return 10 ** (db / 10) + + +def ratio_to_db(ratio, val2=None, using_amplitude=True): + """ + Converts the input float to db, which represents the equivalent + to the ratio in power represented by the multiplier passed in. + """ + ratio = float(ratio) + + # accept 2 values and use the ratio of val1 to val2 + if val2 is not None: + ratio = ratio / val2 + + # special case for multiply-by-zero (convert to silence) + if ratio == 0: + return -float('inf') + + if using_amplitude: + return 20 * log(ratio, 10) + else: # using power + return 10 * log(ratio, 10) + + +def register_pydub_effect(fn, name=None): + """ + decorator for adding pydub effects to the AudioSegment objects. + example use: + @register_pydub_effect + def normalize(audio_segment): + ... + or you can specify a name: + @register_pydub_effect("normalize") + def normalize_audio_segment(audio_segment): + ... + """ + if isinstance(fn, basestring): + name = fn + return lambda fn: register_pydub_effect(fn, name) + + if name is None: + name = fn.__name__ + + from .audio_segment import AudioSegment + setattr(AudioSegment, name, fn) + return fn + + +def make_chunks(audio_segment, chunk_length): + """ + Breaks an AudioSegment into chunks that are milliseconds + long. + if chunk_length is 50 then you'll get a list of 50 millisecond long audio + segments back (except the last one, which can be shorter) + """ + number_of_chunks = ceil(len(audio_segment) / float(chunk_length)) + return [audio_segment[i * chunk_length:(i + 1) * chunk_length] + for i in range(int(number_of_chunks))] + + +def which(program): + """ + Mimics behavior of UNIX which command. + """ + # Add .exe program extension for windows support + if os.name == "nt" and not program.endswith(".exe"): + program += ".exe" + + envdir_list = [os.curdir] + os.environ["PATH"].split(os.pathsep) + + for envdir in envdir_list: + program_path = os.path.join(envdir, program) + if os.path.isfile(program_path) and os.access(program_path, os.X_OK): + return program_path + + +def get_encoder_name(): + """ + Return enconder default application for system, either avconv or ffmpeg + """ + if which("avconv"): + return "avconv" + elif which("ffmpeg"): + return "ffmpeg" + else: + # should raise exception + warn("Couldn't find ffmpeg or avconv - defaulting to ffmpeg, but may not work", RuntimeWarning) + return "ffmpeg" + + +def get_player_name(): + """ + Return enconder default application for system, either avconv or ffmpeg + """ + if which("avplay"): + return "avplay" + elif which("ffplay"): + return "ffplay" + else: + # should raise exception + warn("Couldn't find ffplay or avplay - defaulting to ffplay, but may not work", RuntimeWarning) + return "ffplay" + + +def get_prober_name(): + """ + Return probe application, either avconv or ffmpeg + """ + if which("avprobe"): + return "avprobe" + elif which("ffprobe"): + return "ffprobe" + else: + # should raise exception + warn("Couldn't find ffprobe or avprobe - defaulting to ffprobe, but may not work", RuntimeWarning) + return "ffprobe" + + +def fsdecode(filename): + """Wrapper for os.fsdecode which was introduced in python 3.2 .""" + + if sys.version_info >= (3, 2): + PathLikeTypes = (basestring, bytes) + if sys.version_info >= (3, 6): + PathLikeTypes += (os.PathLike,) + if isinstance(filename, PathLikeTypes): + return os.fsdecode(filename) + else: + if isinstance(filename, bytes): + return filename.decode(sys.getfilesystemencoding()) + if isinstance(filename, basestring): + return filename + + raise TypeError("type {0} not accepted by fsdecode".format(type(filename))) + + +def get_extra_info(stderr): + """ + avprobe sometimes gives more information on stderr than + on the json output. The information has to be extracted + from stderr of the format of: + ' Stream #0:0: Audio: flac, 88200 Hz, stereo, s32 (24 bit)' + or (macOS version): + ' Stream #0:0: Audio: vorbis' + ' 44100 Hz, stereo, fltp, 320 kb/s' + + :type stderr: str + :rtype: list of dict + """ + extra_info = {} + + re_stream = r'(?P +)Stream #0[:\.](?P([0-9]+))(?P.+)\n?(?! *Stream)((?P +)(?P.+))?' + for i in re.finditer(re_stream, stderr): + if i.group('space_end') is not None and len(i.group('space_start')) <= len( + i.group('space_end')): + content_line = ','.join([i.group('content_0'), i.group('content_1')]) + else: + content_line = i.group('content_0') + tokens = [x.strip() for x in re.split('[:,]', content_line) if x] + extra_info[int(i.group('stream_id'))] = tokens + return extra_info + + +def mediainfo_json(filepath, read_ahead_limit=-1): + """Return json dictionary with media info(codec, duration, size, bitrate...) from filepath + """ + prober = get_prober_name() + command_args = [ + "-v", "info", + "-show_format", + "-show_streams", + ] + try: + command_args += [fsdecode(filepath)] + stdin_parameter = None + stdin_data = None + except TypeError: + if prober == 'ffprobe': + command_args += ["-read_ahead_limit", str(read_ahead_limit), + "cache:pipe:0"] + else: + command_args += ["-"] + stdin_parameter = PIPE + file, close_file = _fd_or_path_or_tempfile(filepath, 'rb', tempfile=False) + file.seek(0) + stdin_data = file.read() + if close_file: + file.close() + + command = [prober, '-of', 'json'] + command_args + res = Popen(command, stdin=stdin_parameter, stdout=PIPE, stderr=PIPE) + output, stderr = res.communicate(input=stdin_data) + output = output.decode("utf-8", 'ignore') + stderr = stderr.decode("utf-8", 'ignore') + + info = json.loads(output) + + if not info: + # If ffprobe didn't give any information, just return it + # (for example, because the file doesn't exist) + return info + + extra_info = get_extra_info(stderr) + + audio_streams = [x for x in info['streams'] if x['codec_type'] == 'audio'] + if len(audio_streams) == 0: + return info + + # We just operate on the first audio stream in case there are more + stream = audio_streams[0] + + def set_property(stream, prop, value): + if prop not in stream or stream[prop] == 0: + stream[prop] = value + + for token in extra_info[stream['index']]: + m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token) + m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token) + if m: + set_property(stream, 'sample_fmt', m.group(1)) + set_property(stream, 'bits_per_sample', int(m.group(2))) + set_property(stream, 'bits_per_raw_sample', int(m.group(3))) + elif m2: + set_property(stream, 'sample_fmt', m2.group(1)) + set_property(stream, 'bits_per_sample', int(m2.group(2))) + set_property(stream, 'bits_per_raw_sample', int(m2.group(2))) + elif re.match('(flt)p?( \(default\))?$', token): + set_property(stream, 'sample_fmt', token) + set_property(stream, 'bits_per_sample', 32) + set_property(stream, 'bits_per_raw_sample', 32) + elif re.match('(dbl)p?( \(default\))?$', token): + set_property(stream, 'sample_fmt', token) + set_property(stream, 'bits_per_sample', 64) + set_property(stream, 'bits_per_raw_sample', 64) + return info + + +def mediainfo(filepath): + """Return dictionary with media info(codec, duration, size, bitrate...) from filepath + """ + + prober = get_prober_name() + command_args = [ + "-v", "quiet", + "-show_format", + "-show_streams", + filepath + ] + + command = [prober, '-of', 'old'] + command_args + res = Popen(command, stdout=PIPE) + output = res.communicate()[0].decode("utf-8") + + if res.returncode != 0: + command = [prober] + command_args + output = Popen(command, stdout=PIPE).communicate()[0].decode("utf-8") + + rgx = re.compile(r"(?:(?P.*?):)?(?P.*?)\=(?P.*?)$") + info = {} + + if sys.platform == 'win32': + output = output.replace("\r", "") + + for line in output.split("\n"): + # print(line) + mobj = rgx.match(line) + + if mobj: + # print(mobj.groups()) + inner_dict, key, value = mobj.groups() + + if inner_dict: + try: + info[inner_dict] + except KeyError: + info[inner_dict] = {} + info[inner_dict][key] = value + else: + info[key] = value + + return info + + +def cache_codecs(function): + cache = {} + + @wraps(function) + def wrapper(): + try: + return cache[0] + except: + cache[0] = function() + return cache[0] + + return wrapper + + +@cache_codecs +def get_supported_codecs(): + encoder = get_encoder_name() + command = [encoder, "-codecs"] + res = Popen(command, stdout=PIPE, stderr=PIPE) + output = res.communicate()[0].decode("utf-8") + if res.returncode != 0: + return [] + + if sys.platform == 'win32': + output = output.replace("\r", "") + + + rgx = re.compile(r"^([D.][E.][AVS.][I.][L.][S.]) (\w*) +(.*)") + decoders = set() + encoders = set() + for line in output.split('\n'): + match = rgx.match(line.strip()) + if not match: + continue + flags, codec, name = match.groups() + + if flags[0] == 'D': + decoders.add(codec) + + if flags[1] == 'E': + encoders.add(codec) + + return (decoders, encoders) + + +def get_supported_decoders(): + return get_supported_codecs()[0] + + +def get_supported_encoders(): + return get_supported_codecs()[1] + +def stereo_to_ms(audio_segment): + ''' + Left-Right -> Mid-Side + ''' + channel = audio_segment.split_to_mono() + channel = [channel[0].overlay(channel[1]), channel[0].overlay(channel[1].invert_phase())] + return AudioSegment.from_mono_audiosegments(channel[0], channel[1]) + +def ms_to_stereo(audio_segment): + ''' + Mid-Side -> Left-Right + ''' + channel = audio_segment.split_to_mono() + channel = [channel[0].overlay(channel[1]) - 3, channel[0].overlay(channel[1].invert_phase()) - 3] + return AudioSegment.from_mono_audiosegments(channel[0], channel[1]) +