Sideband/sbapp/pyogg/opus_file_stream.py

128 lines
3.8 KiB
Python

import ctypes
from . import ogg
from . import opus
from .pyogg_error import PyOggError
class OpusFileStream:
def __init__(self, path):
"""Opens an OggOpus file as a stream.
path should be a string giving the filename of the file to
open. Unicode file names may not work correctly.
An exception will be raised if the file cannot be opened
correctly.
"""
error = ctypes.c_int()
self.of = opus.op_open_file(ogg.to_char_p(path), ctypes.pointer(error))
if error.value != 0:
self.of = None
raise PyOggError("file couldn't be opened or doesn't exist. Error code : {}".format(error.value))
#: Number of channels in audio file
self.channels = opus.op_channel_count(self.of, -1)
#: Total PCM Length
self.pcm_size = opus.op_pcm_total(self.of, -1)
#: Number of samples per second (per channel)
self.frequency = 48000
# The buffer size should be (per channel) large enough to
# hold 120ms (the largest possible Opus frame) at 48kHz.
# See https://opus-codec.org/docs/opusfile_api-0.7/group__stream__decoding.html#ga963c917749335e29bb2b698c1cb20a10
self.buffer_size = self.frequency // 1000 * 120 * self.channels
self.Buf = opus.opus_int16 * self.buffer_size
self._buf = self.Buf()
self.buffer_ptr = ctypes.cast(
ctypes.pointer(self._buf),
opus.opus_int16_p
)
#: Bytes per sample
self.bytes_per_sample = ctypes.sizeof(opus.opus_int16)
def __del__(self):
if self.of is not None:
opus.op_free(self.of)
def get_buffer(self):
"""Obtains the next frame of PCM samples.
Returns an array of signed 16-bit integers. If the file
is in stereo, the left and right channels are interleaved.
Returns None when all data has been read.
The array that is returned should be either processed or
copied before the next call to :meth:`~get_buffer` or
:meth:`~get_buffer_as_array` as the array's memory is reused for
each call.
"""
# Read the next frame
samples_read = opus.op_read(
self.of,
self.buffer_ptr,
self.buffer_size,
None
)
# Check for errors
if samples_read < 0:
raise PyOggError(
"Failed to read OpusFileStream. Error {:d}".format(samples_read)
)
# Check if we've reached the end of the stream
if samples_read == 0:
return None
# Cast the pointer to opus_int16 to an array of the
# correct size
result_ptr = ctypes.cast(
self.buffer_ptr,
ctypes.POINTER(opus.opus_int16 * (samples_read*self.channels))
)
# Convert the array to Python bytes
return bytes(result_ptr.contents)
def get_buffer_as_array(self):
"""Provides the buffer as a NumPy array.
Note that the underlying data type is 16-bit signed
integers.
Does not copy the underlying data, so the returned array
should either be processed or copied before the next call
to :meth:`~get_buffer` or :meth:`~get_buffer_as_array`.
"""
import numpy # type: ignore
# Read the next samples from the stream
buf = self.get_buffer()
# Check if we've come to the end of the stream
if buf is None:
return None
# Convert the bytes buffer to a NumPy array
array = numpy.frombuffer(
buf,
dtype=numpy.int16
)
# Reshape the array
return array.reshape(
(len(buf)
// self.bytes_per_sample
// self.channels,
self.channels)
)