EveryDream2trainer/utils/sample_generator.py

354 lines
17 KiB
Python

import json
import logging
import os.path
from dataclasses import dataclass
import random
from typing import Generator, Callable, Any
import torch
from PIL import Image, ImageDraw, ImageFont
from colorama import Fore, Style
from diffusers import (StableDiffusionPipeline, DDIMScheduler, DPMSolverMultistepScheduler, DDPMScheduler,
PNDMScheduler, EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, LMSDiscreteScheduler,
KDPM2AncestralDiscreteScheduler, DPMSolverSDEScheduler, DPMSolverSinglestepScheduler)
from torch import FloatTensor
from torch.cuda.amp import autocast
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from tqdm.auto import tqdm
from compel import Compel
def clean_filename(filename):
"""
removes all non-alphanumeric characters from a string so it is safe to use as a filename
"""
return "".join([c for c in filename if c.isalpha() or c.isdigit() or c==' ']).rstrip()
@dataclass
class SampleRequest:
prompt: str
negative_prompt: str
seed: int
size: tuple[int,int]
wants_random_caption: bool = False
def __str__(self):
rep = self.prompt
if len(self.negative_prompt) > 0:
rep += f"\n negative prompt: {self.negative_prompt}"
rep += f"\n seed: {self.seed}"
return rep
def chunk_list(l: list, batch_size: int,
compatibility_test: Callable[[Any,Any], bool]=lambda x,y: True
) -> Generator[list, None, None]:
buckets = []
for item in l:
compatible_bucket = next((b for b in buckets if compatibility_test(item, b[0])), None)
if compatible_bucket is not None:
compatible_bucket.append(item)
else:
buckets.append([item])
for b in buckets:
for i in range(0, len(b), batch_size):
yield b[i:i + batch_size]
def get_best_size_for_aspect_ratio(aspect_ratio, default_resolution) -> tuple[int, int]:
sizes = []
target_pixel_count = default_resolution * default_resolution
for w in range(256, 1024, 64):
for h in range(256, 1024, 64):
if abs((w * h) - target_pixel_count) <= 128 * 64:
sizes.append((w, h))
best_size = min(sizes, key=lambda s: abs(1 - (aspect_ratio / (s[0] / s[1]))))
return best_size
class SampleGenerator:
seed: int
default_resolution: int
cfgs: list[float] = [7, 4, 1.01]
scheduler: str = 'ddim'
num_inference_steps: int = 30
random_captions = False
sample_requests: [str]
log_folder: str
log_writer: SummaryWriter
def __init__(self,
log_folder: str,
log_writer: SummaryWriter,
default_resolution: int,
config_file_path: str,
batch_size: int,
default_seed: int,
default_sample_steps: int,
use_xformers: bool,
use_penultimate_clip_layer: bool,
guidance_rescale: float = 0):
self.log_folder = log_folder
self.log_writer = log_writer
self.batch_size = batch_size
self.config_file_path = config_file_path
self.use_xformers = use_xformers
self.show_progress_bars = False
self.generate_pretrain_samples = False
self.use_penultimate_clip_layer = use_penultimate_clip_layer
self.guidance_rescale = guidance_rescale
self.default_resolution = default_resolution
self.default_seed = default_seed
self.sample_steps = default_sample_steps
self.sample_requests = None
self.reload_config()
print(f" * SampleGenerator initialized with {len(self.sample_requests)} prompts, generating samples every {self.sample_steps} training steps, using scheduler '{self.scheduler}' with {self.num_inference_steps} inference steps")
if not os.path.exists(f"{log_folder}/samples/"):
os.makedirs(f"{log_folder}/samples/")
def reload_config(self):
try:
config_file_extension = os.path.splitext(self.config_file_path)[1].lower()
if config_file_extension == '.txt':
self._reload_sample_prompts_txt(self.config_file_path)
elif config_file_extension == '.json':
self._reload_config_json(self.config_file_path)
else:
raise ValueError(f"Unrecognized file type '{config_file_extension}' for sample config, must be .txt or .json")
except Exception as e:
logging.warning(
f" * {Fore.LIGHTYELLOW_EX}Error trying to read sample config from {self.config_file_path}: {Style.RESET_ALL}{e}")
logging.warning(
f" Edit {self.config_file_path} to fix the problem. It will be automatically reloaded next time samples are due to be generated."
)
if self.sample_requests == None:
logging.warning(
f" Will generate samples from random training image captions until the problem is fixed.")
self.sample_requests = self._make_random_caption_sample_requests()
def update_random_captions(self, possible_captions: list[str]):
random_prompt_sample_requests = [r for r in self.sample_requests if r.wants_random_caption]
for i, r in enumerate(random_prompt_sample_requests):
r.prompt = possible_captions[i % len(possible_captions)]
def _reload_sample_prompts_txt(self, path):
with open(path, 'rt') as f:
self.sample_requests = [SampleRequest(prompt=line.strip(),
negative_prompt='',
seed=self.default_seed,
size=(self.default_resolution, self.default_resolution)
) for line in f]
if len(self.sample_requests) == 0:
self.sample_requests = self._make_random_caption_sample_requests()
def _make_random_caption_sample_requests(self):
num_random_captions = min(4, self.batch_size)
return [SampleRequest(prompt='',
negative_prompt='',
seed=self.default_seed,
size=(self.default_resolution, self.default_resolution),
wants_random_caption=True)
for _ in range(num_random_captions)]
def _reload_config_json(self, path):
with open(path, 'rt') as f:
config = json.load(f)
# if keys are missing, keep current values
self.default_resolution = config.get('resolution', self.default_resolution)
self.cfgs = config.get('cfgs', self.cfgs)
self.batch_size = config.get('batch_size', self.batch_size)
self.scheduler = config.get('scheduler', self.scheduler)
self.num_inference_steps = config.get('num_inference_steps', self.num_inference_steps)
self.show_progress_bars = config.get('show_progress_bars', self.show_progress_bars)
self.generate_pretrain_samples = config.get('generate_pretrain_samples', self.generate_pretrain_samples)
self.sample_steps = config.get('generate_samples_every_n_steps', self.sample_steps)
sample_requests_config = config.get('samples', None)
if sample_requests_config is None:
self.sample_requests = self._make_random_caption_sample_requests()
else:
default_seed = config.get('seed', self.default_seed)
self.sample_requests = [SampleRequest(prompt=p.get('prompt', ''),
negative_prompt=p.get('negative_prompt', ''),
seed=p.get('seed', default_seed),
size=tuple(p.get('size', None) or
get_best_size_for_aspect_ratio(p.get('aspect_ratio', 1), self.default_resolution)),
wants_random_caption=p.get('random_caption', False)
) for p in sample_requests_config]
if len(self.sample_requests) == 0:
self.sample_requests = self._make_random_caption_sample_requests()
@torch.no_grad()
def generate_samples(self, pipe: StableDiffusionPipeline, global_step: int, extra_info: str = ""):
"""
generates samples at different cfg scales and saves them to disk
"""
disable_progress_bars = not self.show_progress_bars
try:
font = ImageFont.truetype(font="arial.ttf", size=20)
except:
font = ImageFont.load_default()
if not self.show_progress_bars:
print(f" * Generating samples at gs:{global_step} for {len(self.sample_requests)} prompts")
sample_index = 0
with autocast():
batch: list[SampleRequest]
def sample_compatibility_test(a: SampleRequest, b: SampleRequest) -> bool:
return a.size == b.size
batches = list(chunk_list(self.sample_requests, self.batch_size,
compatibility_test=sample_compatibility_test))
pbar = tqdm(total=len(batches), disable=disable_progress_bars, position=1, leave=False,
desc=f"{Fore.YELLOW}Image samples (batches of {self.batch_size}){Style.RESET_ALL}")
compel = Compel(tokenizer=pipe.tokenizer,
text_encoder=pipe.text_encoder,
use_penultimate_clip_layer=self.use_penultimate_clip_layer)
for batch in batches:
prompts = [p.prompt for p in batch]
negative_prompts = [p.negative_prompt for p in batch]
seeds = [(p.seed if p.seed != -1 else random.randint(0, 2 ** 30))
for p in batch]
# all sizes in a batch are the same
size = batch[0].size
generators = [torch.Generator(pipe.device).manual_seed(seed) for seed in seeds]
batch_images = []
for cfg in self.cfgs:
pipe.set_progress_bar_config(disable=disable_progress_bars, position=2, leave=False,
desc=f"{Fore.LIGHTYELLOW_EX}CFG scale {cfg}{Style.RESET_ALL}")
prompt_embeds = compel(prompts)
negative_prompt_embeds = compel(negative_prompts)
images = pipe(prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
num_inference_steps=self.num_inference_steps,
num_images_per_prompt=1,
guidance_scale=cfg,
generator=generators,
width=size[0],
height=size[1],
guidance_rescale=self.guidance_rescale
).images
for image in images:
draw = ImageDraw.Draw(image)
print_msg = f"cfg:{cfg:.1f}"
l, t, r, b = draw.textbbox(xy=(0, 0), text=print_msg, font=font)
text_width = r - l
text_height = b - t
x = float(image.width - text_width - 10)
y = float(image.height - text_height - 10)
draw.rectangle((x, y, image.width, image.height), fill="white")
draw.text((x, y), print_msg, fill="black", font=font)
batch_images.append(images)
del images
del generators
#print("batch_images:", batch_images)
width = size[0] * len(self.cfgs)
height = size[1]
for prompt_idx in range(len(batch)):
#print(f"batch_images[:][{prompt_idx}]: {batch_images[:][prompt_idx]}")
result = Image.new('RGB', (width, height))
x_offset = 0
for cfg_idx in range(len(self.cfgs)):
image = batch_images[cfg_idx][prompt_idx]
result.paste(image, (x_offset, 0))
x_offset += image.width
prompt = prompts[prompt_idx]
clean_prompt = clean_filename(prompt)
result.save(f"{self.log_folder}/samples/gs{global_step:05}-{sample_index}-{extra_info}{clean_prompt[:100]}.jpg", format="JPEG", quality=95, optimize=True, progressive=False)
with open(f"{self.log_folder}/samples/gs{global_step:05}-{sample_index}-{extra_info}{clean_prompt[:100]}.txt", "w", encoding='utf-8') as f:
f.write(str(batch[prompt_idx]))
tfimage = transforms.ToTensor()(result)
if batch[prompt_idx].wants_random_caption:
self.log_writer.add_image(tag=f"sample_{sample_index}{extra_info}", img_tensor=tfimage, global_step=global_step)
else:
self.log_writer.add_image(tag=f"sample_{sample_index}_{extra_info}{clean_prompt[:100]}", img_tensor=tfimage, global_step=global_step)
sample_index += 1
del result
del tfimage
del batch_images
pbar.update(1)
@torch.no_grad()
def create_inference_pipe(self, unet, text_encoder, tokenizer, vae, diffusers_scheduler_config: dict):
"""
creates a pipeline for SD inference
"""
scheduler = self._create_scheduler(diffusers_scheduler_config)
pipe = StableDiffusionPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=None, # save vram
requires_safety_checker=None, # avoid nag
feature_extractor=None, # must be None if no safety checker
)
if self.use_xformers:
pipe.enable_xformers_memory_efficient_attention()
return pipe
@torch.no_grad()
def _create_scheduler(self, scheduler_config: dict):
scheduler = self.scheduler
if scheduler not in ['ddim', 'pndm', 'ddpm', 'lms', 'euler', 'euler_a', 'kdpm2', 'dpm++',
'dpm++_2s', 'dpm++_2m', 'dpm++_sde', 'dpm++_2m_sde',
'dpm++_2s_k', 'dpm++_2m_k', 'dpm++_sde_k', 'dpm++_2m_sde_k']:
print(f"unsupported scheduler '{self.scheduler}', falling back to ddim")
scheduler = 'ddim'
if scheduler == 'ddim':
return DDIMScheduler.from_config(scheduler_config)
elif scheduler == 'dpm++_2s':
return DPMSolverSinglestepScheduler.from_config(scheduler_config, use_karras_sigmas=False)
elif scheduler == 'dpm++_2s_k':
return DPMSolverSinglestepScheduler.from_config(scheduler_config, use_karras_sigmas=True)
elif scheduler == 'dpm++' or scheduler == 'dpm++_2m':
return DPMSolverMultistepScheduler.from_config(scheduler_config, algorithm_type="dpmsolver++", use_karras_sigmas=False)
elif scheduler == 'dpm++_2m_k':
return DPMSolverMultistepScheduler.from_config(scheduler_config, algorithm_type="dpmsolver++", use_karras_sigmas=True)
elif scheduler == 'dpm++_sde':
return DPMSolverSDEScheduler.from_config(scheduler_config, use_karras_sigmas=False, noise_sampler_seed=0)
elif scheduler == 'dpm++_sde_k':
return DPMSolverSDEScheduler.from_config(scheduler_config, use_karras_sigmas=True, noise_sampler_seed=0)
elif scheduler == 'dpm++_2m_sde':
return DPMSolverMultistepScheduler.from_config(scheduler_config, algorithm_type="sde-dpmsolver++", use_karras_sigmas=False)
elif scheduler == 'dpm++_2m_sde_k':
return DPMSolverMultistepScheduler.from_config(scheduler_config, algorithm_type="sde-dpmsolver++", use_karras_sigmas=True)
elif scheduler == 'pndm':
return PNDMScheduler.from_config(scheduler_config)
elif scheduler == 'ddpm':
return DDPMScheduler.from_config(scheduler_config)
elif scheduler == 'lms':
return LMSDiscreteScheduler.from_config(scheduler_config)
elif scheduler == 'euler':
return EulerDiscreteScheduler.from_config(scheduler_config)
elif scheduler == 'euler_a':
return EulerAncestralDiscreteScheduler.from_config(scheduler_config)
elif scheduler == 'kdpm2':
return KDPM2AncestralDiscreteScheduler.from_config(scheduler_config)
else:
raise ValueError(f"unknown scheduler '{scheduler}'")