@ -30,12 +30,14 @@ from PIL import Image
import PIL.ImageOps as ImageOps
from pynvml import *
from transformers import AutoModelForCausalLM, LlamaTokenizer, PreTrainedTokenizer, BitsAndBytesConfig
from transformers import AutoModelForCausalLM, LlamaTokenizer, BitsAndBytesConfig, LlavaForConditionalGeneration, AutoProcessor, LlavaProcessor, AutoTokenizer
from transformers.modeling_outputs import BaseModelOutputWithPast
from colorama import Fore, Style
from plugins.caption_plugins import load_prompt_alteration_plugin
from utils.patch_cog import patch_cog
from utils.ed_logging import configure_logging
from data.generators import image_path_generator, SUPPORTED_EXT
@ -48,54 +50,8 @@ Image.MAX_IMAGE_PIXELS = 715827880*4 # expand the size limit
IMAGE_SIZE: int = 490
PATCH_SIZE: int = 14
patch_cog() # fixes inv_freq key error with cogvlm, quantization, and newer transformers revisions
def build_conversation_input_ids(
tokenizer: PreTrainedTokenizer,
query: str,
history: Optional[List[Tuple[str, str]]] = None,
images: Optional[List[Image.Image]] = None,
starts_with: Optional[str] = None,
# based on
image_size: int = IMAGE_SIZE
patch_size: int = PATCH_SIZE
assert images is None or len(images) <= 1, f"not support multi images by now."
history = history or []
text = f"Question: {query} Answer: "
text += starts_with if starts_with is not None else ""
input_ids = [tokenizer.bos_token_id]
token_type_ids = [0]
if images is not None and len(images) == 1:
# vision
transform = transforms.Compose(
(image_size, image_size), interpolation=transforms.InterpolationMode.BICUBIC
transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)),
images = [transform(images[0])]
vision_token_num = (image_size // patch_size) * (image_size // patch_size) + 2
input_ids += [tokenizer.pad_token_id] * vision_token_num
token_type_ids += [1] * vision_token_num
text_ids = tokenizer.encode(text, add_special_tokens=False)
input_ids += text_ids
token_type_ids += [0] * len(text_ids)
attention_mask = [1] * len(input_ids)
return {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"token_type_ids": torch.tensor(token_type_ids, dtype=torch.long),
"attention_mask": torch.tensor(attention_mask, dtype=torch.long),
"images": images,
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
def get_gpu_memory_map():
@ -116,7 +72,7 @@ def save_params(args, gen_kwargs):
def create_bnb_config():
return BitsAndBytesConfig(
bnb_4bit_quant_type= "fp4",
@ -128,6 +84,121 @@ def create_bnb_config():
class BaseModelWrapper:
def __init__(self, model_name):
self.model_name = model_name"Loading {model_name}")
def load_model(self, bits: int=4, grad_ckpt: bool=False, lora: bool=False, dtype: str="fp16"):
self.model = AutoModelForCausalLM.from_pretrained(
self.tokenizer = AutoProcessor.from_pretrained(self.model_name)
return self.model, self.tokenizer
def get_gen_kwargs(self, args):
gen_kwargs = {
"max_length": args.max_length,
"do_sample": args.top_k is not None or args.top_p is not None or args.temp is not None or False,
"length_penalty": args.length_penalty,
"num_beams": args.num_beams,
"temperature": args.temp,
"top_k": args.top_k,
"top_p": args.top_p,
"repetition_penalty": args.repetition_penalty,
"no_repeat_ngram_size": args.no_repeat_ngram_size,
"min_new_tokens": args.min_new_tokens,
"max_new_tokens": args.max_new_tokens,
"length_penalty": args.length_penalty,
if args.max_new_tokens is not None:"** max_new_tokens set to {args.max_new_tokens}, ignoring max_length")
del gen_kwargs["max_length"]
if not gen_kwargs["do_sample"]:"** Using greedy sampling")
del gen_kwargs["top_k"]
del gen_kwargs["top_p"]
del gen_kwargs["temperature"]
else:"** Sampling enabled")
return gen_kwargs
def caption(prompt, args):
return ""
class XtunerLlavaModelManager(BaseModelWrapper): #
def __init__(self, model_name: str="xtuner/llava-llama-3-8b-v1_1-transformers"):
self.model_name = "xtuner/llava-llama-3-8b-v1_1-transformers"
def load_model(self, bits: int=4, grad_ckpt: bool=False, lora: bool=False, dtype: str="fp16"):
self.model = LlavaForConditionalGeneration.from_pretrained(
#self.model = AutoModelForCausalLM.from_pretrained(
self.processor = LlavaProcessor.from_pretrained(self.model_name)
self.tokenizer = AutoTokenizer.from_pretrained("xtuner/llava-llama-3-8b-v1_1-transformers")
print(f"self.tokenizer: {self.tokenizer}")
# tokens = self.tokenizer("foo")
# print(f"foo tokens test1: {tokens}")
return self.model, self.tokenizer
def get_inputs(self, image: Image.Image, prompt: str):
inputs = self.processor(prompt, image, return_tensors='pt').to(0, torch.float16)
return inputs
def _build_conversational_input_ids(self, prompt, starts_with):
return (f"<|start_header_id|>user<|end_header_id|>\n\n<image>\n{prompt}<|eot_id|>"
def _get_full_sentences(self, caption, args):
logging.debug(f"**DEBUG: XtunerLlava presplit caption: {caption}")
if args.max_length is not None and len(caption) > args.max_length:
caption = caption[:args.max_length]
caption = caption.split(".")
#sentence_count = min(4, len(caption))
caption = ". ".join(caption[0:-1]) + "."
logging.debug(f"**DEBUG: caption: {caption}")
return caption
def caption(self, prompt, image, args, force_words_ids, bad_words_ids, history=[]):
gen_kwargs = self.get_gen_kwargs(args)
prompt = self._build_conversational_input_ids(prompt, args.starts_with)
inputs = self.processor(prompt, image, return_tensors='pt').to(0, torch.float16)
# inputs = processor(prompt, raw_image, return_tensors='pt').to(0, torch.float16)
inputs = {
"input_ids": inputs["input_ids"],
"attention_mask": inputs['attention_mask'],
"pixel_values": inputs['pixel_values'],
#"images": [[inputs["images"][0].to("cuda").to(torch.bfloat16)] for _ in range(args.num_beams)],
#"output_hidden_states": True,
#"return_dict": True
len_inputs = inputs['input_ids'].shape[1]
outputs = self.model.generate(**inputs, **gen_kwargs, force_words_ids=force_words_ids, bad_words_ids=bad_words_ids)
caption = self.processor.decode(outputs[0][len_inputs:], skip_special_tokens=True)
caption = self._get_full_sentences(caption, args)
return caption
class MoaiManager:
def __init__(self, model_name: str):
self.model_name = model_name
@ -167,17 +238,17 @@ class MoaiManager:
return moai_inputs
def __call__(self, moai_inputs, do_sample=True, temperature=0.9, top_p=0.95, max_new_tokens=256, use_cache=True) -> Any:
with torch.inference_mode():
generate_ids = self.moai_model.generate(**moai_inputs, do_sample=do_sample, temperature=temperature, top_p=top_p, max_new_tokens=max_new_tokens, use_cache=use_cache)
answer = self.moai_processor.batch_decode(generate_ids, skip_special_tokens=True)[0].split("[U")[0]
return answer
# def __call__(self, moai_inputs, do_sample=True, temperature=0.9, top_p=0.95, max_new_tokens=256, use_cache=True) -> Any:
# with torch.inference_mode():
# generate_ids = self.moai_model.generate(**moai_inputs, do_sample=do_sample, temperature=temperature, top_p=top_p, max_new_tokens=max_new_tokens, use_cache=use_cache)
# answer = self.moai_processor.batch_decode(generate_ids, skip_special_tokens=True)[0].split("[U")[0]
# return answer
class CogVLMManager:
class CogVLMManager(BaseModelWrapper):
def __init__(self, model_name: str):
self.model_name = model_name
self.tokenizer = None
self.model = None
self.model_name = "THUDM/cogvlm-chat-hf"
patch_cog() # fixes inv_freq key error with cogvlm, quantization, and newer transformers revisions
def load_model(self):
self.tokenizer = LlamaTokenizer.from_pretrained("lmsys/vicuna-7b-v1.5")
@ -190,67 +261,120 @@ class CogVLMManager:
return self.model, self.tokenizer
def get_inputs(self, prompt: str, history: List[Tuple[str, str]], images: List[Image.Image], starts_with: str):
return build_conversation_input_ids(self.tokenizer, query=prompt, history=history, images=images, starts_with=starts_with)
def _build_conversation_input_ids(self,
query: str,
history: Optional[List[Tuple[str, str]]] = None,
images: Optional[List[Image.Image]] = None,
starts_with: Optional[str] = None,
# based on
image_size: int = IMAGE_SIZE
patch_size: int = PATCH_SIZE
assert images is None or len(images) <= 1, f"not support multi images by now."
history = history or []
def get_gen_kwargs(self, args):
gen_kwargs = {
"max_length": args.max_length,
"do_sample": args.top_k is not None or args.top_p is not None or args.temp is not None or False,
"length_penalty": args.length_penalty,
"num_beams": args.num_beams,
"temperature": args.temp,
"top_k": args.top_k,
"top_p": args.top_p,
"repetition_penalty": args.repetition_penalty,
"no_repeat_ngram_size": args.no_repeat_ngram_size,
"min_new_tokens": args.min_new_tokens,
"max_new_tokens": args.max_new_tokens,
"length_penalty": args.length_penalty,
text = f"Question: {query} Answer: "
text += starts_with if starts_with is not None else ""
input_ids = [self.tokenizer.bos_token_id]
token_type_ids = [0]
if images is not None and len(images) == 1:
# vision
transform = transforms.Compose(
(image_size, image_size), interpolation=transforms.InterpolationMode.BICUBIC
transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)),
images = [transform(images[0])]
vision_token_num = (image_size // patch_size) * (image_size // patch_size) + 2
input_ids += [self.tokenizer.pad_token_id] * vision_token_num
token_type_ids += [1] * vision_token_num
text_ids = self.tokenizer.encode(text, add_special_tokens=False)
input_ids += text_ids
token_type_ids += [0] * len(text_ids)
attention_mask = [1] * len(input_ids)
return {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"token_type_ids": torch.tensor(token_type_ids, dtype=torch.long),
"attention_mask": torch.tensor(attention_mask, dtype=torch.long),
"images": images,
if args.max_new_tokens is not None:"** max_new_tokens set to {args.max_new_tokens}, ignoring max_length")
del gen_kwargs["max_length"]
if not gen_kwargs["do_sample"]:"** Using greedy sampling")
del gen_kwargs["top_k"]
del gen_kwargs["top_p"]
del gen_kwargs["temperature"]
else:"** Sampling enabled")
return gen_kwargs
def caption(self, prompt, image, args, force_words_ids, bad_words_ids, history=[]):
gen_kwargs = self.get_gen_kwargs(args)
def model_manager_factory(model_name: str):
inputs = self._build_conversation_input_ids(query=prompt, history=history, images=[image], starts_with=args.starts_with)
inputs = {
"input_ids": inputs["input_ids"].unsqueeze(0).to("cuda"),
"token_type_ids": inputs['token_type_ids'].unsqueeze(0).to("cuda"),
"attention_mask": inputs['attention_mask'].unsqueeze(0).to("cuda"),
"images": [[inputs["images"][0].to("cuda").to(torch.bfloat16)] for _ in range(args.num_beams)],
"output_hidden_states": True,
"return_dict": True
outputs = self.model.generate(**inputs, **gen_kwargs, force_words_ids=force_words_ids, bad_words_ids=bad_words_ids)
#print(f"type of outputs: {type(outputs)}, outputs shape: {outputs.shape}")
#print(f"type of hidden_states: {type(hidden_states)}, outputs shape: {hidden_states.shape}")
len_inputs = inputs['input_ids'].shape[1]
outputs_without_prompt = outputs[:, len_inputs:]
caption = self.tokenizer.decode(outputs_without_prompt[0], skip_special_tokens=True)
return caption
def get_model_wrapper(model_name: str):
if "moai" in model_name:
return MoaiManager(model_name)
elif "llava" in model_name:
return XtunerLlavaModelManager(model_name)
return CogVLMManager(model_name)
def get_inputs_dict(inputs):
inputs = {
"input_ids": inputs["input_ids"].unsqueeze(0).to("cuda"),
"token_type_ids": inputs['token_type_ids'].unsqueeze(0).to("cuda"),
"attention_mask": inputs['attention_mask'].unsqueeze(0).to("cuda"),
"images": [[inputs["images"][0].to("cuda").to(torch.bfloat16)] for _ in range(args.num_beams)],
"output_hidden_states": True,
"return_dict": True
def main(args):
prompt_plugin_fn = load_prompt_alteration_plugin(args.prompt_plugin, args=args)
model_manager = model_manager_factory(args.model)
model, tokenizer = model_manager.load_model()
model_wrapper = get_model_wrapper(args.model)
args.append = args.append or ""
if len(args.append) > 0:
args.append = " " + args.append.strip()
gen_kwargs = model_manager.get_gen_kwargs(args)
gen_kwargs = model_wrapper.get_gen_kwargs(args)
force_words_ids = None
if args.force_words is not None:
force_words = args.force_words.split(",") if args.force_words is not None else []"** force_words: {Fore.LIGHTGREEN_EX}{force_words}{Style.RESET_ALL}")
force_words_ids = tokenizer(force_words, add_special_tokens=False)["input_ids"] if force_words else []
# if args.model contains "cog"
if "cog" in args.model:
force_words_ids = model_wrapper.tokenizer(force_words, add_special_tokens=False)["input_ids"] if force_words else []
force_words_ids = model_wrapper.tokenizer(force_words)["input_ids"] if force_words else []
bad_words_ids = None
if args.bad_words is not None:
bad_words = args.bad_words.split(",") if args.bad_words is not None else []"** bad_words: {Fore.LIGHTGREEN_EX}{bad_words}{Style.RESET_ALL}")
bad_words_ids = tokenizer(bad_words, add_special_tokens=False)["input_ids"] if bad_words else []
bad_words_ids = model_wrapper.tokenizer(bad_words, add_special_tokens=False)["input_ids"] if bad_words else []
#print(bad_words_ids)"** gen_kwargs: \n{Fore.LIGHTGREEN_EX}{gen_kwargs}{Style.RESET_ALL}")
@ -278,40 +402,19 @@ def main(args):
logging.warning(f"Non-fatal error processing {image_path}: {e}")
pixel_count = image.height * image.width
if pixel_count < args.min_pixels:
logging.warning(f" * Image under {args.min_pixels} pixels, skipping. Path: {image_path}")
logging.debug(f" __ Prompt before plugin: {Fore.LIGHTGREEN_EX}{args.prompt}{Style.RESET_ALL}")
prompt = prompt_plugin_fn(image_path, args=args)
logging.debug(f" __ Modified prompt after plugin: {Fore.LIGHTGREEN_EX}{prompt}{Style.RESET_ALL}")
inputs = build_conversation_input_ids(tokenizer, query=prompt, history=[], images=[image], starts_with=args.starts_with) # chat mode
inputs = {
"input_ids": inputs["input_ids"].unsqueeze(0).to("cuda"),
"token_type_ids": inputs['token_type_ids'].unsqueeze(0).to("cuda"),
"attention_mask": inputs['attention_mask'].unsqueeze(0).to("cuda"),
"images": [[inputs["images"][0].to("cuda").to(torch.bfloat16)] for _ in range(args.num_beams)],
"output_hidden_states": True,
"return_dict": True
# print(f"** inputs type: {type(inputs)}") # dict
# print(f"** inputs len: {len(inputs)}") # 4
# print(f"** inputs keys: {inputs.keys()}") # dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'images'])
# print(f"** inputs['images'] shape: {inputs['images'].shape}") # list has no shape
# print(f"** image_path: {image_path}")
with torch.no_grad():
#input_decoded = tokenizer.decode(inputs["input_ids"][0], skip_special_tokens=True)
#logging.debug(f"inputs decoded: {input_decoded}")
#print(f"calling generate with input shapes: {inputs['input_ids'].shape}, {inputs['token_type_ids'].shape}, {inputs['attention_mask'].shape}, {inputs['images'][0][0].shape}")
#calling generate with input shapes: torch.Size([1, 1352]), torch.Size([1, 1352]), torch.Size([1, 1352]), torch.Size([3, 490, 490])
outputs = model.generate(**inputs, **gen_kwargs, force_words_ids=force_words_ids, bad_words_ids=bad_words_ids)
print(f"type of outputs: {type(outputs)}, outputs shape: {outputs.shape}")
#print(f"type of hidden_states: {type(hidden_states)}, outputs shape: {hidden_states.shape}")
#def caption(self, prompt, images, args, force_words_ids, bad_words_ids, history=[]):
caption = model_wrapper.caption(prompt, image, args, force_words_ids=force_words_ids, bad_words_ids=bad_words_ids)
len_inputs = inputs['input_ids'].shape[1]
outputs_without_prompt = outputs[:, len_inputs:]
caption = tokenizer.decode(outputs_without_prompt[0], skip_special_tokens=True)
if not args.remove_starts_with:
# deal with caption starting with comma, etc
if not re.match(r"^\W", caption):
@ -325,7 +428,7 @@ def main(args):
vram_gb = get_gpu_memory_map()
elapsed_time = time.time() - cap_start_time"n:{i:05}, VRAM: {Fore.LIGHTYELLOW_EX}{vram_gb:0.1f} GB{Style.RESET_ALL}, elapsed: {Fore.LIGHTYELLOW_EX}{elapsed_time:0.1f}{Style.RESET_ALL} sec, Captioned {Fore.LIGHTYELLOW_EX}{image_path}{Style.RESET_ALL}: ")"n:{i:05}, VRAM: {Fore.LIGHTYELLOW_EX}{vram_gb:0.1f} GB{Style.RESET_ALL}, elapsed: {Fore.LIGHTYELLOW_EX}{elapsed_time:0.1f}{Style.RESET_ALL} sec, sqrt_pixels: {pow(float(pixel_count),0.5):0.1f}, Captioned {Fore.LIGHTYELLOW_EX}{image_path}{Style.RESET_ALL}: ")"{Fore.LIGHTCYAN_EX}{caption}{Style.RESET_ALL}")
i_processed += 1
@ -339,19 +442,6 @@ def main(args):
def configure_logging(args: argparse.Namespace):
level = logging.INFO if not args.debug else logging.DEBUG
filemode = "a" if args.append_log else "w"
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
console = logging.StreamHandler()
EXAMPLES = """ex.
Basic example:
python --image_dir /mnt/mydata/kyrie/ --prompt 'Describe this image in detail, including the subject matter and medium of the artwork.'
@ -409,9 +499,10 @@ if __name__ == "__main__":
argparser.add_argument("--remove_starts_with", action="store_true", help="Removes the starts_with words from the output caption.")
argparser.add_argument("--append_log", action="store_true", help="Sets logging to append mode.")
argparser.add_argument("--model", type=str, default="THUDM/cogvlm-chat-hf", help="Model to use for captioning.")
argparser.add_argument("--min_pixels", type=int, default=1, help="Minimum total pixel size to caption, under the limit will be skipped")
args, unknown_args = argparser.parse_known_args()
configure_logging(args, "caption_cog.log")
unknown_args_dict = {}
@ -0,0 +1,17 @@

@ -46,9 +46,9 @@ This may also be useful to really "force" a style into the model with a high set
## Timestep clamping
Stable Diffusion uses 1000 possible timesteps for denoising steps. If you wish to train only a portion of those timesteps instead of the entire schedule you can clamp the value.
Stable Diffusion uses 1000 possible timesteps for denoising steps. Timesteps are always chosen randomly per training example, per step, within the possible or allowed timesteps.
Timesteps are always chosen randomly per training example, per step, within the possible or allowed timesteps.
If you wish to train only a portion of those timesteps instead of the entire schedule you can clamp the value.
For instance, if you only want to train from 500 to 999, use this:
@ -58,7 +58,9 @@ Or if you only want to try from 0 to 449, use this:
--timestep_end 450
Possible use cases are to "focus" training on aesthetics or composition. It's likely you may need to train all timesteps as a "clean up" if you train just specific timestep ranges first.
Possible use cases are to "focus" training on aesthetics or composition by limiting timesteps and training specific data with certain qualities. It's likely you may need to train all timesteps as a "clean up" if you train just specific timestep ranges first so the model does not overfit the fine tuned timesteps and lead to problems during inference.
This could also be used to train expert models for specific timestep ranges, similar to the SDXL Refiner model.
## Loss Type

@ -228,6 +228,54 @@ class TitleAndTagsFromImageJson(PromptIdentityBase):
logging.debug(f" {self.key}: prompt after: {prompt}")
return prompt
class VogueRunwayImageJson(PromptIdentityBase):
def __init__(self, args:Namespace=None):
description="Adds title and tags hint from metadata.json (in the samefolder as the image) to the prompt",
def try_get_kvps(self, metadata, keys:list):
values = []
for key in keys:
val = metadata.get(key, "")
if not val:
if type(val) == int:
val = str(val)
val = val.strip()
values.append(f"{key}: {val}")
hint = ", ".join(values)
return hint
def _title_and_tags_from_metadata_json(self, args:Namespace) -> str:
prompt = args.prompt
logging.debug(f" {self.key}: prompt before: {prompt}")
image_path = args.image_path
current_dir = os.path.dirname(image_path)
image_path_base = os.path.basename(image_path)
image_path_without_extension = os.path.splitext(image_path_base)[0]
candidate_json_path = os.path.join(current_dir, f"{image_path_without_extension}.json")
if os.path.exists(candidate_json_path):
with open(candidate_json_path, "r") as f:
metadata = json.load(f)
keys = ["designer","season","category","year"]
hint = ""
hint = self.try_get_kvps(metadata, keys)
tags = metadata.get("tags", [])
tags = tags.split(",") if isinstance(tags, str) else tags # can be csv or list
if tags and len(tags) > 0:
tags = ", ".join(tags)
hint += f"\nTags: {tags}"
prompt = self._add_hint_to_prompt(hint, prompt)
logging.debug(f" {self.key}: prompt after: {prompt}")
return prompt
class TitleAndTagsFromFolderMetadataJson(PromptIdentityBase):
def __init__(self, args:Namespace=None):
@ -0,0 +1,17 @@

@ -0,0 +1,17 @@
import logging
import argparse
def configure_logging(args: argparse.Namespace, log_file=None):
level = logging.INFO if not args.debug else logging.DEBUG
if log_file:
filemode = "a" if args.append_log else "w"
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
console = logging.StreamHandler()