当前位置:AIGC资讯 > AIGC > 正文

AIGC入门(二)从零开始搭建Diffusion!(下)

上篇。https://blog.csdn.net/alxws/article/details/140058117?spm=1001.2014.3001.5502四、前向加噪过程(ForwardProcess.py)

当我们的去噪器设计完成后,接下来,就是我们的另外两个重要的部分:前向加噪过程和反向去噪过程了。

我们先看看论文原文中的那个算法介绍:

左边为训练过程,右边为去噪采样过程。

在训练过程中,我们是通过那个公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。

我们将模型生成的噪声与我们在高斯噪声采样中得到的结果做差,来模拟基于某张个时间步后得到的噪声。之后我们将在去噪过程中,按照公式完成我们的图片的生成。

那么,开始!首先看看我们的前向加噪过程是怎么实现的吧!

这是我们导入的包:

import torch
import torch.nn.functional as F
from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize,RandomHorizontalFlip
import numpy as np

1、 beta的选择

我们在前面简单介绍过程的时候,说了一嘴,且。实际上,这个 𝛽 的递增序列我们有各种各样不同的写法。最常见的就是让这个序列线性增长:

# 定义一个线性beta调度函数,生成一个线性增长的beta值序列。
def linear_beta_schedule(timesteps):
    # 设置beta值的起始和结束值。
    beta_start = 0.0001
    beta_end = 0.02
    # 返回一个从起始值到结束值线性增长的序列。
    return torch.linspace(beta_start, beta_end, timesteps)

或者使用二次函数的方式去生成这个序列:

# 定义一个二次方beta调度函数,生成一个二次方增长的beta值序列。
def quadratic_beta_schedule(timesteps):
    # 设置beta值的起始和结束值。
    beta_start = 0.0001
    beta_end = 0.02
    # 返回一个从起始值的平方根到结束值的平方根线性增长的序列,并将其平方。
    return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2

或者是“缓-急-缓”,即从sigmod函数中采样得到对应的序列:

# 定义一个S形beta调度函数,生成一个S形曲线的beta值序列。
def sigmoid_beta_schedule(timesteps):
    # 设置beta值的起始和结束值。
    beta_start = 0.0001
    beta_end = 0.02
    # 创建一个从-6到6的等间隔序列,用于sigmoid函数的输入。
    betas = torch.linspace(-6, 6, timesteps)
    # 应用sigmoid函数并调整序列,使其在指定的起始和结束值之间。
    return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start

还有人在这方面做过相关的研究,不过我们不去深入探究了。在本文中,笔者使用的还是线性调度的方式去生成相关的序列。

2、 其余相关参数的生成

有了,我们剩下的要做的,就是获得我们需要的一系列参数。让我们先来回顾一下比较重要的两个公式:

一步到位的前向加噪过程公式: 基于上述公式的反向去噪过程:

其中有很多参数都是我们可以直接计算得到的。即通过我们的GetElements()函数得到。其中输出的结果有这样的对应关系:

betas对应;

alphas对应 ;

sqrt_recip_alphas对应 ;

alphas_cumprod对应;

alphas_cumprod_prev对应;

sqrt_alphas_cumprod对应;

sqrt_one_minus_alphas_cumprod对应;

posterior_variance对应。

让我们使用下面的函数来获得我们的结果:

# 获取相关参数数值;
def GetElements(timesteps=300):
    betas = linear_beta_schedule(timesteps=timesteps)

    # 通过公式定义\bar{alpha}_t。
    alphas = 1. - betas
    alphas_cumprod = torch.cumprod(alphas, axis=0) # 返回逐步累乘结果;
    alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
    sqrt_recip_alphas = torch.sqrt(1.0 / alphas)

    # 计算q(x_t | x_{t-1});
    sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
    sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)

    # 计算 q(x_{t-1} | x_t, x_0);
    posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
    return [betas,# β参数,控制噪声的加入;
            alphas,
            alphas_cumprod,
            alphas_cumprod_prev,
            sqrt_recip_alphas,  # α的平方根的倒数;
            sqrt_alphas_cumprod,
            sqrt_one_minus_alphas_cumprod,  # 1-α的累积乘积的平方根;
            posterior_variance] # 后验方差;

3、前向加噪过程部分

接下来,就是公式的直接应用!让我们使用这个公式来完成我们的一步加噪过程吧!

# 定义前向加噪过程,即一步骤到位的加噪过程;
def q_sample(x_start, 
             t, 
             sqrt_alphas_cumprod,
             sqrt_one_minus_alphas_cumprod,
             noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)
    sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x_start.shape
    )
    return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

其中有一个函数extract(),它的作用是从输入张量中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状。

# 从输入张量a中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状;
def extract(a, t, x_shape):
    batch_size = t.shape[0] # 获取批次大小,即t张量的第一个维度的大小;
    # 使用gather函数根据t张量中的索引来提取a张量中的元素;
    out = a.gather(-1, t.cpu())
    # 重塑提取出的元素,使其形状与x_shape的前n-1个维度相匹配,同时保持批次大小不变;
    reshaped_out = out.reshape(batch_size, *((1,) * (len(x_shape) - 1)))
    return reshaped_out.to(t.device)

现在,假设我们要对一张图片进行加噪。让我们看看这个加噪过程是怎么完成的吧!以每五十步加噪采样得到的结果组成一组套图来康康:

t=5,55,105,155,205的加噪图。

现在,整个加噪过程都已经完成了!但我们是要将输入的图片转化为张量后放入到其中进行运算的。因此我们需要一些辅助函数来帮忙计算一下。比如这个图像转张量的函数:

# 将一张图像转化为一个张量;
def image2tensor(image,image_size=64):
    transform = Compose([
        Resize(image_size),
        CenterCrop(image_size),
        RandomHorizontalFlip(),
        ToTensor(),
        Lambda(lambda t: (t * 2) - 1),
    ])
    return transform(image)

还有将张量转化为图像的函数:

# 将一个张量转化为一张图像;
def tensor2image(tensor):
    reverse_transform = Compose([
        Lambda(lambda t: (t + 1) / 2),
        Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
        Lambda(lambda t: t * 255.),
        Lambda(lambda t: t.numpy().astype(np.uint8)),
        ToPILImage(),
    ])
    return reverse_transform(tensor.squeeze())

到此,我们的前向传播过程就全部结束啦!根据算法1,我们接下来要做的就是Loss相关的计算了,并且构建我们的反向传播过程。先看看Loss吧!

五、损失函数的设计、训练集的构成(Loss.py)

有了前向加噪过程,我们要做的,就是在此基础上进行Loss的设计了。这其实相当的容易。

下面是我们要导入的包:

import ForwardProcess as FP
import torch.nn.functional as F
import torch
from PIL import Image
import os
from torch.utils.data import Dataset, DataLoader

首先,是我们的Loss设计。

1、Loss设计

根据算法1,我们的Loss要做的其实很简单。我们实际上就是做算法1的4到5步。

为了不用来回翻,笔者在此再放一次图。

则Loss设计的代码如下:

# 损失函数;
def p_losses(denoise_model, 
             x_start, 
             t, 
             sqrt_alphas_cumprod,
             sqrt_one_minus_alphas_cumprod,
             noise=None, 
             loss_type="l1"):
    # 如果没有提供噪声,就创建一个与x_start形状相同的随机噪声;
    if noise is None:
        noise = torch.randn_like(x_start)

    # 通过前向过程获得某一时间步的时候的加噪图像;
    x_noisy = FP.q_sample(x_start=x_start, 
                          t=t, 
                          sqrt_alphas_cumprod=sqrt_alphas_cumprod,
                          sqrt_one_minus_alphas_cumprod=sqrt_one_minus_alphas_cumprod,
                          noise=noise)
    # 让模型输出其结果;
    predicted_noise = denoise_model(x_noisy, t)

    # 根据不同type设计loss下降;
    if loss_type == 'l1':
        loss = F.l1_loss(noise, predicted_noise)
    elif loss_type == 'l2':
        loss = F.mse_loss(noise, predicted_noise)
    elif loss_type == "huber":
        loss = F.smooth_l1_loss(noise, predicted_noise)
    else:
        raise NotImplementedError()

    return loss

我们是通过公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。

2、数据集构建

然后,我们就可以构建自己的数据集了。笔者采用的是ImageNet64x64数据集,并且使用了里面的pizza图。在文件夹里一眼望过去长这样:

每张图片尺寸大小为64x64,且均为RGB图像,一共3000张。

笔者便根据此构建数据集。是很常见的构建方法,正常重写__len__和__getitem__:

# DataLoader的创立;
class CIFARdataset(Dataset):
    def __init__(self, dataset_folder, transform=None):
        super(CIFARdataset, self).__init__()
        self.dataset_folder = dataset_folder
        self.transform = transform
        self.image_files = \
            [os.path.join(self.dataset_folder, file) for file in os.listdir(self.dataset_folder)]
        
    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = self.image_files[idx]
        image = Image.open(str(image_path)).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

然后创建一个DataLoader来管理数据:

# 创建dataloader;
def makeDataLoader(datapath="dataset"):
    dataset = CIFARdataset(datapath, transform=FP.image2tensor)
    dataloader = DataLoader(dataset,batch_size=32,shuffle=True)
    return dataloader

到此,我们的整个文件就全部写完啦!接下来,让我们再跑一遍算法1,来进行一遍完整的训练过程。

六、训练过程(train.py)

有了前向加噪过程和Loss的支持,我们就可以完成训练的整个过程了!其实就是完整跑一遍论文中的算法1啦。

首先,先导入一系列的包:

import torch
import Loss
import Model
import ForwardProcess as FP
from torch.optim import Adam
from torchvision.utils import save_image

我们需要做一些训练前的准备。

1、参数、模型加载

首先,我们先加载我们的一些参数:

# 获得超参数;
elements = FP.GetElements()
# 其中不同的参数的含义;
# [betas,# β参数,控制噪声的加入;                               0
#  alphas,                                                    1
#  alphas_cumprod,                                            2
#  alphas_cumprod_prev,                                       3
#  sqrt_recip_alphas,  # α的平方根的倒数;                      4
#  sqrt_alphas_cumprod,                                       5
#  sqrt_one_minus_alphas_cumprod,  # 1-α的累积乘积的平方根;    6
#  posterior_variance] # 后验方差;                            7

然后加载我们的模型:

# 在gpu上使用程序;
device = "cuda" if torch.cuda.is_available() else "cpu"

# 确定模型参数;
image_size = 64
channels = 3
model = Model.Unet(
    dim=image_size,
    channels=channels,
    dim_mults=(1, 2, 4,)
)
model.to(device)

然后加载优化器和数据集:

# 确定优化器;
optimizer = Adam(model.parameters(), lr=1e-3)

# 确定dataset;
dataset = Loss.makeDataLoader()

再确定我们的epochs和最大时间步:

epochs = 40 #确定训练轮次;
timesteps = 300 #确定最大时间步;

接下来,就要开始我们的训练啦!

2、训练

我们先定义一个训练时的辅助函数,用来为我们的训练批次分组:

# 定义一个函数num_to_groups,接受两个参数:num(被分组的数)和divisor(每组的大小);
def num_to_groups(num, divisor):
    groups = num // divisor 
    remainder = num % divisor  # 使用模运算符'%'计算分组后的余数;
    arr = [divisor] * groups  # 创建一个列表,包含'groups'个'divisor',即完全分组的列表;
    if remainder > 0: 
        arr.append(remainder)  # 将余数作为一个新的组添加到列表中;
    return arr  # 返回包含所有组的列表;

save_and_sample_every = 1000

然后,开始我们正式的训练过程:

for epoch in range(epochs):
    for step, batch in enumerate(dataset):
        optimizer.zero_grad()
        batch_size = batch.shape[0]
        batch = batch.to(device)

        # 任取一个作为时间步进行下降;
        t = torch.randint(0, timesteps, (batch_size,), device=device).long()
        # 获取Loss,并计算得到结果;
        loss = Loss.p_losses(model, 
                             batch, 
                             t, 
                             elements[5],
                             elements[6],
                             loss_type="huber")

        if step % 100 == 0:
            print("Loss:", loss.item())

        # 反向传播;
        loss.backward()
        optimizer.step()

torch.save(model.state_dict(),'./model.pt')

这些都是很常规的东西,不再进行详细的介绍了。训练后的Loss打印出来是这样:

我们训练时打印出来的Loss。

到此,我们的模型就训练完成啦!接下来,我们要做的,就是开始我们的反向去噪过程,来让我们的去噪器能够从一张完整采样的高斯噪声中“拉”出我们的图像。

七、反向去噪过程(ReverseProcess.py)

反向去噪过程,其实就是严格按照我们的算法2进行的。

为了不用来回翻,笔者在此再放一次图。

我们首先要搞定第4步,也就是这个很多密密麻麻公式的这一步。这是我们推导出来的东西。相关的代码如下:

@torch.no_grad()
def p_sample(model, 
             x,  # 当前的样本;
             t,  # 当前的时间步;
             betas,
             sqrt_recip_alphas, 
             sqrt_one_minus_alphas_cumprod, 
             posterior_variance, 
             t_index,
             timesteps=300):
    betas_t = extract(betas, t, x.shape)  # 提取当前时间步的β;
    sqrt_one_minus_alphas_cumprod_t = extract(  # 提取当前时间步的1-α的累积乘积的平方根;
        sqrt_one_minus_alphas_cumprod, t, x.shape)
     # 提取当前时间步的α的平方根的倒数;
    sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape) 
    
    model_mean = sqrt_recip_alphas_t * (  # 计算模型的均值;
        x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t)

    if t_index == 0:  # 如果是第一个时间步;
        return model_mean  # 直接返回模型的均值;
    else:  # 如果不是第一个时间步;
         # 提取当前时间步的后验方差;
        posterior_variance_t = extract(posterior_variance, t, x.shape) 
        noise = torch.randn_like(x) # 生成与x形状相同的随机噪声;
        
        # 返回带有噪声的模型均值;
        return model_mean + torch.sqrt(posterior_variance_t) * noise  

上面的代码是一步采样的结果。我们需要执行整个时间步的采样,去实现我们最后的效果。

接下来是执行整个算法2:

# 执行整个模型采样过程;
@torch.no_grad()
def p_sample_loop(model, 
                  shape,
                  betas,
                  sqrt_recip_alphas, 
                  sqrt_one_minus_alphas_cumprod, 
                  posterior_variance, 
                  timesteps=300):
    device = next(model.parameters()).device

    b = shape[0]
    img = torch.randn(shape, device=device)
    imgs = []

    # 添加加载条加载进度;
    for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):
        img = p_sample(model, 
                       img, 
                       torch.full((b,), i, device=device, dtype=torch.long), 
                       betas,
                       sqrt_recip_alphas, 
                       sqrt_one_minus_alphas_cumprod, 
                       posterior_variance, 
                       i)
        imgs.append(img.cpu().numpy())
    return imgs

然后确定一个接口执行采样:

# 执行采样;
@torch.no_grad()
def sample(model, 
           image_size, 
           betas,
           sqrt_recip_alphas, 
           sqrt_one_minus_alphas_cumprod, 
           posterior_variance,
           batch_size=16, 
           channels=3):
    return p_sample_loop(model, 
                         (batch_size, channels, image_size, image_size),
                         betas,
                         sqrt_recip_alphas, 
                         sqrt_one_minus_alphas_cumprod, 
                         posterior_variance)

搞定!接下来就让我们开始我们的test吧!已经迫不及待的想看我们模型的输出啦!

八、测试结果(test.py)

我们的测试,实际上就是用我们的模型执行我们的反向去噪的整个过程。开始吧!首先是导入的包:

import torch
import Model
import ForwardProcess as FP
import ReverseProcess as RP
import numpy as np
from PIL import Image
import os

首先,先导入我们的模型和参数:

# 在gpu上使用程序;
device = "cuda" if torch.cuda.is_available() else "cpu"

# 确定模型参数;
image_size = 64
channels = 3
model = Model.Unet(
    dim=image_size,
    channels=channels,
    dim_mults=(1, 2, 4,)
)
model.to(device)

# 加载模型;
model.load_state_dict(torch.load('model.pt'))
model.eval()

# 获得公式计算的相关参数;
elements = FP.GetElements()

接下来,开始执行我们的反向去噪过程:

samples = RP.sample(model, 
                    image_size=image_size,
                    batch_size=32, 
                    channels=channels,
                    betas=elements[0],
                    sqrt_recip_alphas=elements[4],
                    sqrt_one_minus_alphas_cumprod=elements[6],
                    posterior_variance=elements[7])

print("shape of samples:",np.shape(samples))
samples = torch.tensor(samples, dtype=torch.float32)

在这一步,这个变量samples的维度是(timesteps, batch_size, channels, image_size, image_size)。我们需要把我们想要的图像保存下来。

先整理一个文件夹:

if not os.path.exists('results'):# 当前目录下没有文件夹就创造一个。
    os.makedirs('results')

然后遍历时间步,取我们需要的图片并保存下来:

# 遍历每个时间步
for i in range(samples.shape[0]):
    # 随机选择一个图像;
    image_index = 25
    image = samples[i, image_index]
    # 将数值缩放到0-255之内;
    img_normalized = ((image - image.min()) * (255 / (image.max() - image.min())))
    # 将numpy数组转换为PIL图像;
    img_normalized = img_normalized.numpy().astype(np.int8)
    img_normalized = np.transpose(img_normalized, (1, 2, 0))
    img_pil = Image.fromarray(img_normalized, 'RGB')
    # 保存图像,图像名称为时间步的名称;
    img_pil.save(f'results/time_step_{i}.png')

print("ending.")

我们的图片保存在当下的results文件夹中。采样的图片长这样:

部分采样的图片。

有些看不清?那让笔者将0步、49步、99步、149步、199步、249步和299步的生成图片单独拉出来看一下:

0步、49步、99步、149步、199步、249步和299步采样图。

我们可以看到,图片逐渐显现出了披萨的模样!这个多么小的数据量和训练批次居然能达到这样的效果,真的很棒!

我们也跑完了整个扩散模型的流程啦!

总结

能跟随笔者的代码,一步一步的到达这里,是一件非常不容易的事情。Diffusion的地位在图像生成领域和Transformer一样重要。当今很多惊人效果的展现,越发的体现出了DDPM的重要性和学习该算法的必要性啦!

笔者也会继续解析一些对目前现实影响深远的模型架构和实现方法。让我们一起加油吧!学无止境!有什么不是很懂的地方,可以在评论区询问!笔者也会一一回答的啦!

让我们一起共同进步!

总结

更新时间 2024-09-05