riffusion-inference/riffusion/cli.py

279 lines
7.8 KiB
Python

"""
Command line tools for riffusion.
"""
import random
import typing as T
from multiprocessing.pool import ThreadPool
from pathlib import Path
import argh
import numpy as np
import pydub
import tqdm
from PIL import Image
from riffusion.spectrogram_image_converter import SpectrogramImageConverter
from riffusion.spectrogram_params import SpectrogramParams
from riffusion.util import image_util
@argh.arg("--step-size-ms", help="Duration of one pixel in the X axis of the spectrogram image")
@argh.arg("--num-frequencies", help="Number of Y axes in the spectrogram image")
def audio_to_image(
*,
audio: str,
image: str,
step_size_ms: int = 10,
num_frequencies: int = 512,
min_frequency: int = 0,
max_frequency: int = 10000,
window_duration_ms: int = 100,
padded_duration_ms: int = 400,
power_for_image: float = 0.25,
stereo: bool = False,
device: str = "cuda",
):
"""
Compute a spectrogram image from a waveform.
"""
segment = pydub.AudioSegment.from_file(audio)
params = SpectrogramParams(
sample_rate=segment.frame_rate,
stereo=stereo,
window_duration_ms=window_duration_ms,
padded_duration_ms=padded_duration_ms,
step_size_ms=step_size_ms,
min_frequency=min_frequency,
max_frequency=max_frequency,
num_frequencies=num_frequencies,
power_for_image=power_for_image,
)
converter = SpectrogramImageConverter(params=params, device=device)
pil_image = converter.spectrogram_image_from_audio(segment)
pil_image.save(image, exif=pil_image.getexif(), format="PNG")
print(f"Wrote {image}")
def print_exif(*, image: str) -> None:
"""
Print the params of a spectrogram image as saved in the exif data.
"""
pil_image = Image.open(image)
exif_data = image_util.exif_from_image(pil_image)
for name, value in exif_data.items():
print(f"{name:<20} = {value:>15}")
def image_to_audio(*, image: str, audio: str, device: str = "cuda"):
"""
Reconstruct an audio clip from a spectrogram image.
"""
pil_image = Image.open(image)
# Get parameters from image exif
img_exif = pil_image.getexif()
assert img_exif is not None
try:
params = SpectrogramParams.from_exif(exif=img_exif)
except (KeyError, AttributeError):
print("WARNING: Could not find spectrogram parameters in exif data. Using defaults.")
params = SpectrogramParams()
converter = SpectrogramImageConverter(params=params, device=device)
segment = converter.audio_from_spectrogram_image(pil_image)
extension = Path(audio).suffix[1:]
segment.export(audio, format=extension)
print(f"Wrote {audio} ({segment.duration_seconds:.2f} seconds)")
def sample_clips(
*,
audio: str,
output_dir: str,
num_clips: int = 1,
duration_ms: int = 5120,
mono: bool = False,
extension: str = "wav",
seed: int = -1,
):
"""
Slice an audio file into clips of the given duration.
"""
if seed >= 0:
np.random.seed(seed)
segment = pydub.AudioSegment.from_file(audio)
if mono:
segment = segment.set_channels(1)
output_dir_path = Path(output_dir)
if not output_dir_path.exists():
output_dir_path.mkdir(parents=True)
segment_duration_ms = int(segment.duration_seconds * 1000)
for i in range(num_clips):
clip_start_ms = np.random.randint(0, segment_duration_ms - duration_ms)
clip = segment[clip_start_ms : clip_start_ms + duration_ms]
clip_name = f"clip_{i}_start_{clip_start_ms}_ms_duration_{duration_ms}_ms.{extension}"
clip_path = output_dir_path / clip_name
clip.export(clip_path, format=extension)
print(f"Wrote {clip_path}")
def audio_to_images_batch(
*,
audio_dir: str,
output_dir: str,
image_extension: str = "jpg",
step_size_ms: int = 10,
num_frequencies: int = 512,
min_frequency: int = 0,
max_frequency: int = 10000,
power_for_image: float = 0.25,
mono: bool = False,
sample_rate: int = 44100,
device: str = "cuda",
num_threads: T.Optional[int] = None,
limit: int = -1,
):
"""
Process audio clips into spectrograms in batch, multi-threaded.
"""
audio_paths = list(Path(audio_dir).glob("*"))
audio_paths.sort()
if limit > 0:
audio_paths = audio_paths[:limit]
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
params = SpectrogramParams(
step_size_ms=step_size_ms,
num_frequencies=num_frequencies,
min_frequency=min_frequency,
max_frequency=max_frequency,
power_for_image=power_for_image,
stereo=not mono,
sample_rate=sample_rate,
)
converter = SpectrogramImageConverter(params=params, device=device)
def process_one(audio_path: Path) -> None:
# Load
try:
segment = pydub.AudioSegment.from_file(str(audio_path))
except Exception:
return
# TODO(hayk): Sanity checks on clip
if mono and segment.channels != 1:
segment = segment.set_channels(1)
elif not mono and segment.channels != 2:
segment = segment.set_channels(2)
# Frame rate
if segment.frame_rate != params.sample_rate:
segment = segment.set_frame_rate(params.sample_rate)
# Convert
image = converter.spectrogram_image_from_audio(segment)
# Save
image_path = output_path / f"{audio_path.stem}.{image_extension}"
image_format = {"jpg": "JPEG", "jpeg": "JPEG", "png": "PNG"}[image_extension]
image.save(image_path, exif=image.getexif(), format=image_format)
# Create thread pool
pool = ThreadPool(processes=num_threads)
with tqdm.tqdm(total=len(audio_paths)) as pbar:
for i, _ in enumerate(pool.imap_unordered(process_one, audio_paths)):
pbar.update()
def sample_clips_batch(
*,
audio_dir: str,
output_dir: str,
num_clips_per_file: int = 1,
duration_ms: int = 5120,
mono: bool = False,
extension: str = "mp3",
num_threads: T.Optional[int] = None,
glob: str = "*",
limit: int = -1,
seed: int = -1,
):
"""
Sample short clips from a directory of audio files, multi-threaded.
"""
audio_paths = list(Path(audio_dir).glob(glob))
audio_paths.sort()
# Exclude json
audio_paths = [p for p in audio_paths if p.suffix != ".json"]
if limit > 0:
audio_paths = audio_paths[:limit]
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
if seed >= 0:
random.seed(seed)
def process_one(audio_path: Path) -> None:
try:
segment = pydub.AudioSegment.from_file(str(audio_path))
except Exception:
return
if mono:
segment = segment.set_channels(1)
segment_duration_ms = int(segment.duration_seconds * 1000)
for i in range(num_clips_per_file):
try:
clip_start_ms = np.random.randint(0, segment_duration_ms - duration_ms)
except ValueError:
continue
clip = segment[clip_start_ms : clip_start_ms + duration_ms]
clip_name = (
f"{audio_path.stem}_{i}_"
f"start_{clip_start_ms}_ms_dur_{duration_ms}_ms.{extension}"
)
clip.export(output_path / clip_name, format=extension)
pool = ThreadPool(processes=num_threads)
with tqdm.tqdm(total=len(audio_paths)) as pbar:
for result in pool.imap_unordered(process_one, audio_paths):
pbar.update()
if __name__ == "__main__":
argh.dispatch_commands(
[
audio_to_image,
image_to_audio,
sample_clips,
print_exif,
audio_to_images_batch,
sample_clips_batch,
]
)