Add/modify CFG callbacks

Required by self-attn guidance extension
https://github.com/ashen-sensored/sd_webui_SAG
This commit is contained in:
catboxanon 2023-05-14 01:49:41 +00:00
parent e8eea1bb7a
commit 3078001439
2 changed files with 42 additions and 1 deletions

View File

@ -53,6 +53,21 @@ class CFGDenoiserParams:
class CFGDenoisedParams: class CFGDenoisedParams:
def __init__(self, x, sampling_step, total_sampling_steps, inner_model):
self.x = x
"""Latent image representation in the process of being denoised"""
self.sampling_step = sampling_step
"""Current Sampling step number"""
self.total_sampling_steps = total_sampling_steps
"""Total number of sampling steps planned"""
self.inner_model = inner_model
"""Inner model reference that is being used for denoising"""
class AfterCFGCallbackParams:
def __init__(self, x, sampling_step, total_sampling_steps): def __init__(self, x, sampling_step, total_sampling_steps):
self.x = x self.x = x
"""Latent image representation in the process of being denoised""" """Latent image representation in the process of being denoised"""
@ -63,6 +78,9 @@ class CFGDenoisedParams:
self.total_sampling_steps = total_sampling_steps self.total_sampling_steps = total_sampling_steps
"""Total number of sampling steps planned""" """Total number of sampling steps planned"""
self.output_altered = False
"""A flag for CFGDenoiser that indicates whether the output has been altered by the callback"""
class UiTrainTabParams: class UiTrainTabParams:
def __init__(self, txt2img_preview_params): def __init__(self, txt2img_preview_params):
@ -87,6 +105,7 @@ callback_map = dict(
callbacks_image_saved=[], callbacks_image_saved=[],
callbacks_cfg_denoiser=[], callbacks_cfg_denoiser=[],
callbacks_cfg_denoised=[], callbacks_cfg_denoised=[],
callbacks_cfg_after_cfg=[],
callbacks_before_component=[], callbacks_before_component=[],
callbacks_after_component=[], callbacks_after_component=[],
callbacks_image_grid=[], callbacks_image_grid=[],
@ -186,6 +205,14 @@ def cfg_denoised_callback(params: CFGDenoisedParams):
report_exception(c, 'cfg_denoised_callback') report_exception(c, 'cfg_denoised_callback')
def cfg_after_cfg_callback(params: AfterCFGCallbackParams):
for c in callback_map['callbacks_cfg_after_cfg']:
try:
c.callback(params)
except Exception:
report_exception(c, 'cfg_after_cfg_callback')
def before_component_callback(component, **kwargs): def before_component_callback(component, **kwargs):
for c in callback_map['callbacks_before_component']: for c in callback_map['callbacks_before_component']:
try: try:
@ -332,6 +359,14 @@ def on_cfg_denoised(callback):
add_callback(callback_map['callbacks_cfg_denoised'], callback) add_callback(callback_map['callbacks_cfg_denoised'], callback)
def on_cfg_after_cfg(callback):
"""register a function to be called in the kdiffussion cfg_denoiser method after cfg calculations has completed.
The callback is called with one argument:
- params: CFGDenoisedParams - parameters to be passed to the inner model and sampling state details.
"""
add_callback(callback_map['callbacks_cfg_after_cfg'], callback)
def on_before_component(callback): def on_before_component(callback):
"""register a function to be called before a component is created. """register a function to be called before a component is created.
The callback is called with arguments: The callback is called with arguments:

View File

@ -8,6 +8,7 @@ from modules.shared import opts, state
import modules.shared as shared import modules.shared as shared
from modules.script_callbacks import CFGDenoiserParams, cfg_denoiser_callback from modules.script_callbacks import CFGDenoiserParams, cfg_denoiser_callback
from modules.script_callbacks import CFGDenoisedParams, cfg_denoised_callback from modules.script_callbacks import CFGDenoisedParams, cfg_denoised_callback
from modules.script_callbacks import AfterCFGCallbackParams, cfg_after_cfg_callback
samplers_k_diffusion = [ samplers_k_diffusion = [
('Euler a', 'sample_euler_ancestral', ['k_euler_a', 'k_euler_ancestral'], {}), ('Euler a', 'sample_euler_ancestral', ['k_euler_a', 'k_euler_ancestral'], {}),
@ -160,7 +161,7 @@ class CFGDenoiser(torch.nn.Module):
fake_uncond = torch.cat([x_out[i:i+1] for i in denoised_image_indexes]) fake_uncond = torch.cat([x_out[i:i+1] for i in denoised_image_indexes])
x_out = torch.cat([x_out, fake_uncond]) # we skipped uncond denoising, so we put cond-denoised image to where the uncond-denoised image should be x_out = torch.cat([x_out, fake_uncond]) # we skipped uncond denoising, so we put cond-denoised image to where the uncond-denoised image should be
denoised_params = CFGDenoisedParams(x_out, state.sampling_step, state.sampling_steps) denoised_params = CFGDenoisedParams(x_out, state.sampling_step, state.sampling_steps, self.inner_model)
cfg_denoised_callback(denoised_params) cfg_denoised_callback(denoised_params)
devices.test_for_nans(x_out, "unet") devices.test_for_nans(x_out, "unet")
@ -180,6 +181,11 @@ class CFGDenoiser(torch.nn.Module):
if self.mask is not None: if self.mask is not None:
denoised = self.init_latent * self.mask + self.nmask * denoised denoised = self.init_latent * self.mask + self.nmask * denoised
after_cfg_callback_params = AfterCFGCallbackParams(denoised, state.sampling_step, state.sampling_steps)
cfg_after_cfg_callback(after_cfg_callback_params)
if after_cfg_callback_params.output_altered:
denoised = after_cfg_callback_params.x
self.step += 1 self.step += 1
return denoised return denoised