上篇。https://blog.csdn.net/alxws/article/details/140058117?spm=1001.2014.3001.5502四、前向加噪过程(ForwardProcess.py)
当我们的去噪器设计完成后,接下来,就是我们的另外两个重要的部分:前向加噪过程和反向去噪过程了。
我们先看看论文原文中的那个算法介绍:
左边为训练过程,右边为去噪采样过程。
在训练过程中,我们是通过那个公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。
我们将模型生成的噪声与我们在高斯噪声采样中得到的结果做差,来模拟基于某张个时间步后得到的噪声。之后我们将在去噪过程中,按照公式完成我们的图片的生成。
那么,开始!首先看看我们的前向加噪过程是怎么实现的吧!
这是我们导入的包:
import torch
import torch.nn.functional as F
from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize,RandomHorizontalFlip
import numpy as np
1、 beta的选择
我们在前面简单介绍过程的时候,说了一嘴,且。实际上,这个 𝛽 的递增序列我们有各种各样不同的写法。最常见的就是让这个序列线性增长:
# 定义一个线性beta调度函数,生成一个线性增长的beta值序列。
def linear_beta_schedule(timesteps):
# 设置beta值的起始和结束值。
beta_start = 0.0001
beta_end = 0.02
# 返回一个从起始值到结束值线性增长的序列。
return torch.linspace(beta_start, beta_end, timesteps)
或者使用二次函数的方式去生成这个序列:
# 定义一个二次方beta调度函数,生成一个二次方增长的beta值序列。
def quadratic_beta_schedule(timesteps):
# 设置beta值的起始和结束值。
beta_start = 0.0001
beta_end = 0.02
# 返回一个从起始值的平方根到结束值的平方根线性增长的序列,并将其平方。
return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2
或者是“缓-急-缓”,即从sigmod函数中采样得到对应的序列:
# 定义一个S形beta调度函数,生成一个S形曲线的beta值序列。
def sigmoid_beta_schedule(timesteps):
# 设置beta值的起始和结束值。
beta_start = 0.0001
beta_end = 0.02
# 创建一个从-6到6的等间隔序列,用于sigmoid函数的输入。
betas = torch.linspace(-6, 6, timesteps)
# 应用sigmoid函数并调整序列,使其在指定的起始和结束值之间。
return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
还有人在这方面做过相关的研究,不过我们不去深入探究了。在本文中,笔者使用的还是线性调度的方式去生成相关的序列。
2、 其余相关参数的生成
有了,我们剩下的要做的,就是获得我们需要的一系列参数。让我们先来回顾一下比较重要的两个公式:
一步到位的前向加噪过程公式: 基于上述公式的反向去噪过程:其中有很多参数都是我们可以直接计算得到的。即通过我们的GetElements()函数得到。其中输出的结果有这样的对应关系:
betas对应;
alphas对应 ;
sqrt_recip_alphas对应 ;
alphas_cumprod对应;
alphas_cumprod_prev对应;
sqrt_alphas_cumprod对应;
sqrt_one_minus_alphas_cumprod对应;
posterior_variance对应。
让我们使用下面的函数来获得我们的结果:
# 获取相关参数数值;
def GetElements(timesteps=300):
betas = linear_beta_schedule(timesteps=timesteps)
# 通过公式定义\bar{alpha}_t。
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0) # 返回逐步累乘结果;
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)
# 计算q(x_t | x_{t-1});
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)
# 计算 q(x_{t-1} | x_t, x_0);
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
return [betas,# β参数,控制噪声的加入;
alphas,
alphas_cumprod,
alphas_cumprod_prev,
sqrt_recip_alphas, # α的平方根的倒数;
sqrt_alphas_cumprod,
sqrt_one_minus_alphas_cumprod, # 1-α的累积乘积的平方根;
posterior_variance] # 后验方差;
3、前向加噪过程部分
接下来,就是公式的直接应用!让我们使用这个公式来完成我们的一步加噪过程吧!
# 定义前向加噪过程,即一步骤到位的加噪过程;
def q_sample(x_start,
t,
sqrt_alphas_cumprod,
sqrt_one_minus_alphas_cumprod,
noise=None):
if noise is None:
noise = torch.randn_like(x_start)
sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
sqrt_one_minus_alphas_cumprod_t = extract(
sqrt_one_minus_alphas_cumprod, t, x_start.shape
)
return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise
其中有一个函数extract(),它的作用是从输入张量中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状。
# 从输入张量a中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状;
def extract(a, t, x_shape):
batch_size = t.shape[0] # 获取批次大小,即t张量的第一个维度的大小;
# 使用gather函数根据t张量中的索引来提取a张量中的元素;
out = a.gather(-1, t.cpu())
# 重塑提取出的元素,使其形状与x_shape的前n-1个维度相匹配,同时保持批次大小不变;
reshaped_out = out.reshape(batch_size, *((1,) * (len(x_shape) - 1)))
return reshaped_out.to(t.device)
现在,假设我们要对一张图片进行加噪。让我们看看这个加噪过程是怎么完成的吧!以每五十步加噪采样得到的结果组成一组套图来康康:
t=5,55,105,155,205的加噪图。
现在,整个加噪过程都已经完成了!但我们是要将输入的图片转化为张量后放入到其中进行运算的。因此我们需要一些辅助函数来帮忙计算一下。比如这个图像转张量的函数:
# 将一张图像转化为一个张量;
def image2tensor(image,image_size=64):
transform = Compose([
Resize(image_size),
CenterCrop(image_size),
RandomHorizontalFlip(),
ToTensor(),
Lambda(lambda t: (t * 2) - 1),
])
return transform(image)
还有将张量转化为图像的函数:
# 将一个张量转化为一张图像;
def tensor2image(tensor):
reverse_transform = Compose([
Lambda(lambda t: (t + 1) / 2),
Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
Lambda(lambda t: t * 255.),
Lambda(lambda t: t.numpy().astype(np.uint8)),
ToPILImage(),
])
return reverse_transform(tensor.squeeze())
到此,我们的前向传播过程就全部结束啦!根据算法1,我们接下来要做的就是Loss相关的计算了,并且构建我们的反向传播过程。先看看Loss吧!
五、损失函数的设计、训练集的构成(Loss.py)
有了前向加噪过程,我们要做的,就是在此基础上进行Loss的设计了。这其实相当的容易。
下面是我们要导入的包:
import ForwardProcess as FP
import torch.nn.functional as F
import torch
from PIL import Image
import os
from torch.utils.data import Dataset, DataLoader
首先,是我们的Loss设计。
1、Loss设计
根据算法1,我们的Loss要做的其实很简单。我们实际上就是做算法1的4到5步。
为了不用来回翻,笔者在此再放一次图。
则Loss设计的代码如下:
# 损失函数;
def p_losses(denoise_model,
x_start,
t,
sqrt_alphas_cumprod,
sqrt_one_minus_alphas_cumprod,
noise=None,
loss_type="l1"):
# 如果没有提供噪声,就创建一个与x_start形状相同的随机噪声;
if noise is None:
noise = torch.randn_like(x_start)
# 通过前向过程获得某一时间步的时候的加噪图像;
x_noisy = FP.q_sample(x_start=x_start,
t=t,
sqrt_alphas_cumprod=sqrt_alphas_cumprod,
sqrt_one_minus_alphas_cumprod=sqrt_one_minus_alphas_cumprod,
noise=noise)
# 让模型输出其结果;
predicted_noise = denoise_model(x_noisy, t)
# 根据不同type设计loss下降;
if loss_type == 'l1':
loss = F.l1_loss(noise, predicted_noise)
elif loss_type == 'l2':
loss = F.mse_loss(noise, predicted_noise)
elif loss_type == "huber":
loss = F.smooth_l1_loss(noise, predicted_noise)
else:
raise NotImplementedError()
return loss
我们是通过公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。
2、数据集构建
然后,我们就可以构建自己的数据集了。笔者采用的是ImageNet64x64数据集,并且使用了里面的pizza图。在文件夹里一眼望过去长这样:
每张图片尺寸大小为64x64,且均为RGB图像,一共3000张。
笔者便根据此构建数据集。是很常见的构建方法,正常重写__len__和__getitem__:
# DataLoader的创立;
class CIFARdataset(Dataset):
def __init__(self, dataset_folder, transform=None):
super(CIFARdataset, self).__init__()
self.dataset_folder = dataset_folder
self.transform = transform
self.image_files = \
[os.path.join(self.dataset_folder, file) for file in os.listdir(self.dataset_folder)]
def __len__(self):
return len(self.image_files)
def __getitem__(self, idx):
image_path = self.image_files[idx]
image = Image.open(str(image_path)).convert('RGB')
if self.transform:
image = self.transform(image)
return image
然后创建一个DataLoader来管理数据:
# 创建dataloader;
def makeDataLoader(datapath="dataset"):
dataset = CIFARdataset(datapath, transform=FP.image2tensor)
dataloader = DataLoader(dataset,batch_size=32,shuffle=True)
return dataloader
到此,我们的整个文件就全部写完啦!接下来,让我们再跑一遍算法1,来进行一遍完整的训练过程。
六、训练过程(train.py)
有了前向加噪过程和Loss的支持,我们就可以完成训练的整个过程了!其实就是完整跑一遍论文中的算法1啦。
首先,先导入一系列的包:
import torch
import Loss
import Model
import ForwardProcess as FP
from torch.optim import Adam
from torchvision.utils import save_image
我们需要做一些训练前的准备。
1、参数、模型加载
首先,我们先加载我们的一些参数:
# 获得超参数;
elements = FP.GetElements()
# 其中不同的参数的含义;
# [betas,# β参数,控制噪声的加入; 0
# alphas, 1
# alphas_cumprod, 2
# alphas_cumprod_prev, 3
# sqrt_recip_alphas, # α的平方根的倒数; 4
# sqrt_alphas_cumprod, 5
# sqrt_one_minus_alphas_cumprod, # 1-α的累积乘积的平方根; 6
# posterior_variance] # 后验方差; 7
然后加载我们的模型:
# 在gpu上使用程序;
device = "cuda" if torch.cuda.is_available() else "cpu"
# 确定模型参数;
image_size = 64
channels = 3
model = Model.Unet(
dim=image_size,
channels=channels,
dim_mults=(1, 2, 4,)
)
model.to(device)
然后加载优化器和数据集:
# 确定优化器;
optimizer = Adam(model.parameters(), lr=1e-3)
# 确定dataset;
dataset = Loss.makeDataLoader()
再确定我们的epochs和最大时间步:
epochs = 40 #确定训练轮次;
timesteps = 300 #确定最大时间步;
接下来,就要开始我们的训练啦!
2、训练
我们先定义一个训练时的辅助函数,用来为我们的训练批次分组:
# 定义一个函数num_to_groups,接受两个参数:num(被分组的数)和divisor(每组的大小);
def num_to_groups(num, divisor):
groups = num // divisor
remainder = num % divisor # 使用模运算符'%'计算分组后的余数;
arr = [divisor] * groups # 创建一个列表,包含'groups'个'divisor',即完全分组的列表;
if remainder > 0:
arr.append(remainder) # 将余数作为一个新的组添加到列表中;
return arr # 返回包含所有组的列表;
save_and_sample_every = 1000
然后,开始我们正式的训练过程:
for epoch in range(epochs):
for step, batch in enumerate(dataset):
optimizer.zero_grad()
batch_size = batch.shape[0]
batch = batch.to(device)
# 任取一个作为时间步进行下降;
t = torch.randint(0, timesteps, (batch_size,), device=device).long()
# 获取Loss,并计算得到结果;
loss = Loss.p_losses(model,
batch,
t,
elements[5],
elements[6],
loss_type="huber")
if step % 100 == 0:
print("Loss:", loss.item())
# 反向传播;
loss.backward()
optimizer.step()
torch.save(model.state_dict(),'./model.pt')
这些都是很常规的东西,不再进行详细的介绍了。训练后的Loss打印出来是这样:
我们训练时打印出来的Loss。
到此,我们的模型就训练完成啦!接下来,我们要做的,就是开始我们的反向去噪过程,来让我们的去噪器能够从一张完整采样的高斯噪声中“拉”出我们的图像。
七、反向去噪过程(ReverseProcess.py)
反向去噪过程,其实就是严格按照我们的算法2进行的。
为了不用来回翻,笔者在此再放一次图。
我们首先要搞定第4步,也就是这个很多密密麻麻公式的这一步。这是我们推导出来的东西。相关的代码如下:
@torch.no_grad()
def p_sample(model,
x, # 当前的样本;
t, # 当前的时间步;
betas,
sqrt_recip_alphas,
sqrt_one_minus_alphas_cumprod,
posterior_variance,
t_index,
timesteps=300):
betas_t = extract(betas, t, x.shape) # 提取当前时间步的β;
sqrt_one_minus_alphas_cumprod_t = extract( # 提取当前时间步的1-α的累积乘积的平方根;
sqrt_one_minus_alphas_cumprod, t, x.shape)
# 提取当前时间步的α的平方根的倒数;
sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)
model_mean = sqrt_recip_alphas_t * ( # 计算模型的均值;
x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t)
if t_index == 0: # 如果是第一个时间步;
return model_mean # 直接返回模型的均值;
else: # 如果不是第一个时间步;
# 提取当前时间步的后验方差;
posterior_variance_t = extract(posterior_variance, t, x.shape)
noise = torch.randn_like(x) # 生成与x形状相同的随机噪声;
# 返回带有噪声的模型均值;
return model_mean + torch.sqrt(posterior_variance_t) * noise
上面的代码是一步采样的结果。我们需要执行整个时间步的采样,去实现我们最后的效果。
接下来是执行整个算法2:
# 执行整个模型采样过程;
@torch.no_grad()
def p_sample_loop(model,
shape,
betas,
sqrt_recip_alphas,
sqrt_one_minus_alphas_cumprod,
posterior_variance,
timesteps=300):
device = next(model.parameters()).device
b = shape[0]
img = torch.randn(shape, device=device)
imgs = []
# 添加加载条加载进度;
for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):
img = p_sample(model,
img,
torch.full((b,), i, device=device, dtype=torch.long),
betas,
sqrt_recip_alphas,
sqrt_one_minus_alphas_cumprod,
posterior_variance,
i)
imgs.append(img.cpu().numpy())
return imgs
然后确定一个接口执行采样:
# 执行采样;
@torch.no_grad()
def sample(model,
image_size,
betas,
sqrt_recip_alphas,
sqrt_one_minus_alphas_cumprod,
posterior_variance,
batch_size=16,
channels=3):
return p_sample_loop(model,
(batch_size, channels, image_size, image_size),
betas,
sqrt_recip_alphas,
sqrt_one_minus_alphas_cumprod,
posterior_variance)
搞定!接下来就让我们开始我们的test吧!已经迫不及待的想看我们模型的输出啦!
八、测试结果(test.py)
我们的测试,实际上就是用我们的模型执行我们的反向去噪的整个过程。开始吧!首先是导入的包:
import torch
import Model
import ForwardProcess as FP
import ReverseProcess as RP
import numpy as np
from PIL import Image
import os
首先,先导入我们的模型和参数:
# 在gpu上使用程序;
device = "cuda" if torch.cuda.is_available() else "cpu"
# 确定模型参数;
image_size = 64
channels = 3
model = Model.Unet(
dim=image_size,
channels=channels,
dim_mults=(1, 2, 4,)
)
model.to(device)
# 加载模型;
model.load_state_dict(torch.load('model.pt'))
model.eval()
# 获得公式计算的相关参数;
elements = FP.GetElements()
接下来,开始执行我们的反向去噪过程:
samples = RP.sample(model,
image_size=image_size,
batch_size=32,
channels=channels,
betas=elements[0],
sqrt_recip_alphas=elements[4],
sqrt_one_minus_alphas_cumprod=elements[6],
posterior_variance=elements[7])
print("shape of samples:",np.shape(samples))
samples = torch.tensor(samples, dtype=torch.float32)
在这一步,这个变量samples的维度是(timesteps, batch_size, channels, image_size, image_size)。我们需要把我们想要的图像保存下来。
先整理一个文件夹:
if not os.path.exists('results'):# 当前目录下没有文件夹就创造一个。
os.makedirs('results')
然后遍历时间步,取我们需要的图片并保存下来:
# 遍历每个时间步
for i in range(samples.shape[0]):
# 随机选择一个图像;
image_index = 25
image = samples[i, image_index]
# 将数值缩放到0-255之内;
img_normalized = ((image - image.min()) * (255 / (image.max() - image.min())))
# 将numpy数组转换为PIL图像;
img_normalized = img_normalized.numpy().astype(np.int8)
img_normalized = np.transpose(img_normalized, (1, 2, 0))
img_pil = Image.fromarray(img_normalized, 'RGB')
# 保存图像,图像名称为时间步的名称;
img_pil.save(f'results/time_step_{i}.png')
print("ending.")
我们的图片保存在当下的results文件夹中。采样的图片长这样:
部分采样的图片。
有些看不清?那让笔者将0步、49步、99步、149步、199步、249步和299步的生成图片单独拉出来看一下:
0步、49步、99步、149步、199步、249步和299步采样图。
我们可以看到,图片逐渐显现出了披萨的模样!这个多么小的数据量和训练批次居然能达到这样的效果,真的很棒!
我们也跑完了整个扩散模型的流程啦!
总结
能跟随笔者的代码,一步一步的到达这里,是一件非常不容易的事情。Diffusion的地位在图像生成领域和Transformer一样重要。当今很多惊人效果的展现,越发的体现出了DDPM的重要性和学习该算法的必要性啦!
笔者也会继续解析一些对目前现实影响深远的模型架构和实现方法。让我们一起加油吧!学无止境!有什么不是很懂的地方,可以在评论区询问!笔者也会一一回答的啦!
让我们一起共同进步!
总结