当前位置:AIGC资讯 > AIGC > 正文

如何训练一个简单的stable diffusion模型(附详细注释)

注:代码来自https://github.com/darcula1993/diffusion-models-class-CN/blob/main/unit1/01_introduction_to_diffusers_CN.ipynb 

本文是本人学习后的的尝试以及注解

一、准备工作

"""

这行命令使用pip工具来安装或升级多个Python包。具体来说,它执行以下操作:

-qq:这是pip的安静模式选项,它会减少输出信息,只显示关键信息,使安装过程更为简洁。

-U:这是pip的升级选项,它指示pip升级已经安装的包到最新版本(如果存在新版本)。

接下来,列出了要安装或升级的包:

diffusers:一个Python包

datasets:Hugging Face Transformers库的一部分,用于提供和管理各种自然语言处理(NLP)数据集的工具。

transformers:Hugging Face Transformers库,提供了预训练的深度学习模型,用于自然语言处理和文本生成任务。

accelerate:Hugging Face库的一部分,用于加速深度学习模型的训练和推理。

ftfy:一个用于处理Unicode文本的Python库,用于修复和清理不规范的Unicode文本。

pyarrow:正如前面所提到的,pyarrow是一个用于高效处理大规模数据集的Python库,支持列式存储和跨语言互操作性。
"""

%pip install -qq -U diffusers datasets transformers accelerate ftfy pyarrow
# 登录hugging face
from huggingface_hub import notebook_login

notebook_login()

显示下图则登陆成功:

# 安装 Git LFS 来上传模型检查点:

%%capture
!sudo apt -qq install git-lfs
!git config --global credential.helper store
# 导入将要使用的库,并定义一些方便函数,稍后将会使用这些函数
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from PIL import Image


def show_images(x):
  """Given a batch of images x, make a grid and convert to PIL"""
  '''
  输入参数:x,一个批量的图像数据(通常是PyTorch张量)。
  功能:将输入的图像数据从范围(-1, 1)映射到(0, 1),然后将这些图像排列成一个网格,并将网格转换为PIL图像。
  返回值:返回一个PIL图像,其中包含了排列好的输入图像网格。
  '''
  x = x * 0.5 + 0.5  # Map from (-1, 1) back to (0, 1)
  grid = torchvision.utils.make_grid(x)
  grid_im = grid.detach().cpu().permute(1, 2, 0).clip(0, 1) * 255
  grid_im = Image.fromarray(np.array(grid_im).astype(np.uint8))
  return grid_im


def make_grid(images, size=64):
  """Given a list of PIL images, stack them together into a line for easy viewing"""
  '''
  输入参数:images,一个包含多个PIL图像的列表,以及一个可选的size参数,用于指定图像的大小。
  功能:将多个PIL图像按照指定的大小堆叠在一行上,以便于查看。
  返回值:返回一个新的PIL图像,其中包含了堆叠在一行上的输入图像。
  '''
  output_im = Image.new("RGB", (size * len(images), size))
  for i, im in enumerate(images):
      output_im.paste(im.resize((size, size)), (i * size, 0))
  return output_im


# Mac users may need device = 'mps' (untested)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

二、下载训练数据集

# 下载一个来自 Hugging Face Hub 的图像集。具体来说,是个 1000 张蝴蝶图像收藏集
import torchvision  # 导入PyTorch的torchvision库
from datasets import load_dataset  # 导入数据集
from torchvision import transforms  # 导入PyTorch的transforms模块

# 使用load_dataset函数加载名为"huggan/smithsonian_butterflies_subset"的数据集中的训练集数据
dataset = load_dataset("huggan/smithsonian_butterflies_subset", split="train")

# 或者从本地文件夹加载图像数据
# dataset = load_dataset("imagefolder", data_dir="path/to/folder")

# 指定图像大小为32x32像素
image_size = 32
# 如果GPU内存不足,可以降低批次大小
batch_size = 64

# 定义数据增强操作
preprocess = transforms.Compose(
    [
        transforms.Resize((image_size, image_size)),  # 调整图像大小
        transforms.RandomHorizontalFlip(),  # 随机水平翻转(数据增强)
        transforms.ToTensor(),  # 将图像转换为张量(数值范围从0到1)
        transforms.Normalize([0.5], [0.5]),  # 将图像像素值归一化到(-1, 1)的范围
    ]
)

# 定义一个用于对数据进行转换的函数
def transform(examples):
    images = [preprocess(image.convert("RGB")) for image in examples["image"]]
    return {"images": images}

# 将数据集的转换函数设置为上述定义的transform函数
dataset.set_transform(transform)

# 创建一个数据加载器,用于以批次方式提供转换后的图像数据
train_dataloader = torch.utils.data.DataLoader(
    dataset, batch_size=batch_size, shuffle=True  # 使用指定的批次大小和随机打乱数据
)
# 我们可以从中取出一批图像数据来看一看他们是什么样子:

xb = next(iter(train_dataloader))["images"].to(device)[:8]
print("X shape:", xb.shape)
show_images(xb).resize((8 * 64, 64), resample=Image.NEAREST)

结果如下图所示:

三、定义管理器

我们的训练计划是,取出这些输入图片然后对它们增添噪声,在这之后把带噪的图片送入模型。在推理阶段,我们将用模型的预测值来不断迭代去除这些噪点。在diffusers中,这两个步骤都是由 管理器(调度器) 来处理的。噪声管理器决定在不同的迭代周期时分别加入多少噪声。

from diffusers import DDPMScheduler

noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
# 绘图查看输入 (x) 与噪声是如何在不同迭代周期中量化和叠加的
plt.plot(noise_scheduler.alphas_cumprod.cpu() ** 0.5, label=r"${\sqrt{\bar{\alpha}_t}}$")
plt.plot((1 - noise_scheduler.alphas_cumprod.cpu()) ** 0.5, label=r"$\sqrt{(1 - \bar{\alpha}_t)}$")
plt.legend(fontsize="x-large");
# 使用noise_scheduler.add_noise功能来添加不同程度的噪声
timesteps = torch.linspace(0, 999, 8).long().to(device)
noise = torch.randn_like(xb)
noisy_xb = noise_scheduler.add_noise(xb, noise, timesteps)
print("Noisy X shape", noisy_xb.shape)
show_images(noisy_xb).resize((8 * 64, 64), resample=Image.NEAREST)

四、定义、训练模型

# 定义模型
from diffusers import UNet2DModel

# Create a model
model = UNet2DModel(
  sample_size=image_size,  # the target image resolution
  in_channels=3,  # the number of input channels, 3 for RGB images
  out_channels=3,  # the number of output channels
  layers_per_block=2,  # how many ResNet layers to use per UNet block
  block_out_channels=(64, 128, 128, 256),  # More channels -> more parameters
  down_block_types=(
      "DownBlock2D",  # a regular ResNet downsampling block
      "DownBlock2D",
      "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
      "AttnDownBlock2D",
  ),
  up_block_types=(
      "AttnUpBlock2D",
      "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
      "UpBlock2D",
      "UpBlock2D",  # a regular ResNet upsampling block
  ),
)
model.to(device);

开始训练模型 

# 创建了一个对象,用于调度噪声的添加。参数指定了训练的总步数和噪声的变化规律
noise_scheduler = DDPMScheduler(
    num_train_timesteps=1000, beta_schedule="squaredcos_cap_v2"
)

# 创建了一个AdamW优化器,用于更新模型的参数。返回模型的可训练参数,参数指定了学习率。
optimizer = torch.optim.AdamW(model.parameters(), lr=4e-4)

# 创建一个空列表,用于存储每个步骤的损失值。
losses = []

for epoch in range(30):
  for step, batch in enumerate(train_dataloader):
    # 从批次中获取干净的图像数据,并将其移动到指定的设备(例如GPU)上进行计算。
    clean_images = batch["images"].to(device)
    # 生成与干净图像相同形状的噪声张量,该噪声将被添加到图像中。
    noise = torch.randn(clean_images.shape).to(clean_images.device)
    # 获取批次的大小。
    bs = clean_images.shape[0]

    # 为每个图像随机生成一个时间步长,该时间步长将用于确定噪声的变化程度。
    timesteps = torch.randint(
      0, noise_scheduler.num_train_timesteps, (bs,), device=clean_images.device
    ).long()

    # 根据噪声调度器中的每个噪声幅度和时间步长,将噪声添加到干净图像中,生成带有噪声的图像。
    noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)

    # 使用模型对带有噪声的图像进行预测,得到去噪后的图像。
    noise_pred = model(noisy_images, timesteps, return_dict=False)[0]

    # 计算预测图像与真实噪声之间的均方误差损失,计算损失相对于模型参数的梯度,并将当前步骤的损失添加到列表中。
    loss = F.mse_loss(noise_pred, noise)
    loss.backward(loss)
    losses.append(loss.item())

    # 使用优化器更新模型的参数,并将梯度置零。
    optimizer.step()
    optimizer.zero_grad()

  # 每隔5个epoch,计算最近一个epoch的平均损失,并打印出来。
  if (epoch + 1) % 5 == 0:
      loss_last_epoch = sum(losses[-len(train_dataloader) :]) / len(train_dataloader)
      print(f"Epoch:{epoch+1}, loss: {loss_last_epoch}")
# 绘制 loss 曲线,我们能看到模型在一开始快速的收敛,接下来以一个较慢的速度持续优化(我们用右边 log 坐标轴的视图可以看的更清楚):

fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].plot(losses)
axs[1].plot(np.log(losses))
plt.show()

五、生成图像 

下面开始生成图像 

# 方法 1:建立一个管道:
from diffusers import DDPMPipeline

image_pipe = DDPMPipeline(unet=model, scheduler=noise_scheduler)
pipeline_output = image_pipe()
pipeline_output.images[0]
# 我们可以在本地文件夹这样保存一个管道:
image_pipe.save_pretrained("my_pipeline")

# 检查文件夹的内容:
!ls my_pipeline/

 这里scheduler与unet子文件夹中包含了生成图像所需的全部组件。比如,在unet文件中能看到模型参数 (diffusion_pytorch_model.bin) 与描述模型结构的配置文件。

# 方法 2:写一个取样循环

# 从随机噪声开始,遍历管理器的迭代周期来看从最嘈杂直到最微小的噪声变化,基于模型的预测一步步减少一些噪声:

# Random starting point (8 random images):
sample = torch.randn(8, 3, 32, 32).to(device)

for i, t in enumerate(noise_scheduler.timesteps):

    # Get model pred
    with torch.no_grad():
        residual = model(sample, t).sample

    # Update sample with step
    sample = noise_scheduler.step(residual, t, sample).prev_sample
    
# noise_scheduler.step () 函数相应做了 sample(取样)时的数学运算。
show_images(sample)

 六、将模型上传到Hugging Face

在上面的例子中我们把管道保存在了本地。 把模型 push 到 hub 上,我们会需要建立模型和相应文件的仓库名。 我们根据你的选择(模型 ID)来决定仓库的名字(大胆的去替换掉model_name吧;需要包含你的用户名,get_full_repo_name ()会帮你做到):

from huggingface_hub import get_full_repo_name

model_name = "sd-class-butterflies-32"
hub_model_id = get_full_repo_name(model_name)
hub_model_id
# 然后,在 ? Hub 上创建模型仓库并 push 它吧:

from huggingface_hub import HfApi, create_repo

create_repo(hub_model_id)
api = HfApi()
api.upload_folder(
    folder_path="my_pipeline/scheduler", path_in_repo="", repo_id=hub_model_id
)
api.upload_folder(folder_path="my_pipeline/unet", path_in_repo="", repo_id=hub_model_id)
api.upload_file(
    path_or_fileobj="my_pipeline/model_index.json",
    path_in_repo="model_index.json",
    repo_id=hub_model_id,
)
# 最后一件事是创建一个超棒的模型卡,如此,我们的蝴蝶生成器可以轻松的在 Hub 上被找到(请在描述中随意发挥!):

from huggingface_hub import ModelCard

content = f"""
---
license: mit
tags:
- pytorch
- diffusers
- unconditional-image-generation
- diffusion-models-class
---

# Model Card for Unit 1 of the [Diffusion Models Class ?](https://github.com/huggingface/diffusion-models-class)

This model is a diffusion model for unconditional image generation of cute ?.

## Usage

```python
from diffusers import DDPMPipeline

pipeline = DDPMPipeline.from_pretrained('{hub_model_id}')
image = pipeline().images[0]
image
```
"""

card = ModelCard(content)
card.push_to_hub(hub_model_id)
# 现在模型已经在 Hub 上了,你可以这样从任何地方使用DDPMPipeline的from_pretrained ()方法来下来它:
from diffusers import DDPMPipeline

image_pipe = DDPMPipeline.from_pretrained(hub_model_id)
pipeline_output = image_pipe()
pipeline_output.images[0]

更新时间 2023-12-15