diffusion-models-class-CN/unit1/README_CN.md at main · darcula1993/diffusion-models-class-CN · GitHub
你可以使用命令行来通过此令牌登录 (huggingface-cli login
) 或者运行以下单元来登录:
from huggingface_hub import notebook_login
notebook_login()
https://huggingface.co/settings/tokens
在上面的网站中
获取令牌登录
直接调包的例子
导入将要使用的库,并定义一些方便函数,稍后将会在 Notebook 中使用这些函数:
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from PIL import Image
def show_images(x):
"""Given a batch of images x, make a grid and convert to PIL"""
x = x * 0.5 + 0.5 # Map from (-1, 1) back to (0, 1)
grid = torchvision.utils.make_grid(x)
grid_im = grid.detach().cpu().permute(1, 2, 0).clip(0, 1) * 255
grid_im = Image.fromarray(np.array(grid_im).astype(np.uint8))
return grid_im
def make_grid(images, size=64):
"""Given a list of PIL images, stack them together into a line for easy viewing"""
output_im = Image.new("RGB", (size * len(images), size))
for i, im in enumerate(images):
output_im.paste(im.resize((size, size)), (i * size, 0))
return output_im
# Mac users may need device = 'mps' (untested)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
加载管道
from diffusers import StableDiffusionPipeline
# Check out https://huggingface.co/sd-dreambooth-library for loads of models from the community
model_id = "sd-dreambooth-library/mr-potato-head"
# Load the pipeline
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to(
device
)
管道加载完成后,我们可以使用以下命令生成图像:
# prompt = "an abstract oil painting of sks mr potato head by picasso"
prompt = "A dancing elephant"
image = pipe(prompt, num_inference_steps=50, guidance_scale=7.5).images[0]
image
Diffusers 的核心 API 被分为三个主要部分:
- 管道: 从高层出发设计的多种类函数,旨在以易部署的方式,能够做到快速通过主流预训练好的扩散模型来生成样本。
- 模型: 训练新的扩散模型时用到的主流网络架构,e.g. UNet.
- 管理器 (or 调度器): 在 推理 中使用多种不同的技巧来从噪声中生成图像,同时也可以生成在 训练 中所需的带噪图像。
能够生成小蝴蝶图片的管道。
from diffusers import DDPMPipeline
# Load the butterfly pipeline
butterfly_pipeline = DDPMPipeline.from_pretrained(
"johnowhitaker/ddpm-butterflies-32px"
# ).to(device)
).to("cuda")
# Create 8 images
images = butterfly_pipeline(batch_size=8).images
# View the result
make_grid(images)
下面这里会是最终的结果:
自己构建管道
下载一个训练数据集
在这个例子中,我们会用到一个来自 Hugging Face Hub 的图像集。具体来说,是个 1000 张蝴蝶图像收藏集. 这是个非常小的数据集,我们这里也同时包含了已被注释的内容指向一些规模更大的选择。如果你想使用你自己的图像收藏,你也可以使用这里被注释掉的示例代码,从一个指定的文件夹来装载图片。
# %pip install -qq -U datasets
import torchvision
from datasets import load_dataset
from torchvision import transforms
import torch
dataset = load_dataset("huggan/smithsonian_butterflies_subset", split="train")
# Or load images from a local folder
# dataset = load_dataset("imagefolder", data_dir="path/to/folder")
# We'll train on 32-pixel square images, but you can try larger sizes too
image_size = 32
# You can lower your batch size if you're running out of GPU memory
batch_size = 64
# Define data augmentations
preprocess = transforms.Compose(
[
transforms.Resize((image_size, image_size)), # Resize
transforms.RandomHorizontalFlip(), # Randomly flip (data augmentation)
transforms.ToTensor(), # Convert to tensor (0, 1)
transforms.Normalize([0.5], [0.5]), # Map to (-1, 1)
]
)
def transform(examples):
images = [preprocess(image.convert("RGB")) for image in examples["image"]]
return {"images": images}
dataset.set_transform(transform)
# Create a dataloader from the dataset to serve up the transformed images in batches
train_dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=True
)
我们可以从中取出一批图像数据来看一看他们是什么样子:
xb = next(iter(train_dataloader))["images"].to("cuda")[:8]
print("X shape:", xb.shape)
show_images(xb).resize((8 * 64, 64), resample=Image.NEAREST)
定义管理器
我们的训练计划是,取出这些输入图片然后对它们增添噪声,在这之后把带噪的图片送入模型。在推理阶段,我们将用模型的预测值来不断迭代去除这些噪点。在diffusers
中,这两个步骤都是由 管理器(调度器) 来处理的。
噪声管理器决定在不同的迭代周期时分别加入多少噪声。我们可以这样创建一个管理器,是取自于训练并能取样 'DDPM' 的默认配置。 (基于此篇论文 "Denoising Diffusion Probabalistic Models":
from diffusers import DDPMScheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
DDPM 论文这样来描述一个损坏过程,为每一个 ' 迭代周期 '(timestep) 增添一点少量的噪声。设在某个迭代周期有 xt−1, 我们可以得到它的下一个版本 xt (比之前更多一点点噪声):
这就是说,我们取 xt−1, 给他一个 的系数,然后加上带有 βt 系数的噪声。 这里 β 是根据一些管理器来为每一个 t 设定的,来决定每一个迭代周期中添加多少噪声。 现在,我们不想把这个推演进行 500 次来得到 x500,所以我们用另一个公式来根据给出的 x0 计算得到任意 t 时刻的 xt:
数学符号看起来总是很可怕!好在有管理器来为我们完成这些运算。我们可以画出 α¯t−−√ (标记为sqrt_alpha_prod
) 和 (1−α¯t)−−−−−−−√ (标记为sqrt_one_minus_alpha_prod
) 来看一下输入 (x) 与噪声是如何在不同迭代周期中量化和叠加的:
plt.plot(noise_scheduler.alphas_cumprod.cpu() ** 0.5, label=r"${\sqrt{\bar{\alpha}_t}}$")
plt.plot((1 - noise_scheduler.alphas_cumprod.cpu()) ** 0.5, label=r"$\sqrt{(1 - \bar{\alpha}_t)}$")
plt.legend(fontsize="x-large");
噪声越来越大
不论你选择了哪一个管理器 (调度器),我们现在都可以使用noise_scheduler.add_noise
功能来添加不同程度的噪声,就像这样:
timesteps = torch.linspace(0, 999, 8).long().to(device)
noise = torch.randn_like(xb)
noisy_xb = noise_scheduler.add_noise(xb, noise, timesteps)
print("Noisy X shape", noisy_xb.shape)
show_images(noisy_xb).resize((8 * 64, 64), resample=Image.NEAREST)
定义模型
大多数扩散模型使用的模型结构都是一些 [U-net] 的变形 (https://arxiv.org/abs/1505.04597) 也是我们在这里会用到的结构。
概括来说:
- 输入模型中的图片经过几个由 ResNetLayer 构成的层,其中每层都使图片尺寸减半。
- 之后在经过同样数量的层把图片升采样。
- 其中还有对特征在相同位置的上、下采样层残差连接模块。
模型一个关键特征既是,输出图片尺寸与输入图片相同,这正是我们这里需要的。
Diffusers 为我们提供了一个易用的UNet2DModel
类,用来在 PyTorch 创建所需要的结构。
我们来使用 U-net 为我们生成目标大小的图片吧。 注意这里down_block_types
对应下采样模块 (上图中绿色部分), 而up_block_types
对应上采样模块 (上图中红色部分):
from diffusers import UNet2DModel
# Create a model
model = UNet2DModel(
sample_size=image_size, # the target image resolution
in_channels=3, # the number of input channels, 3 for RGB images
out_channels=3, # the number of output channels
layers_per_block=2, # how many ResNet layers to use per UNet block
block_out_channels=(64, 128, 128, 256), # More channels -> more parameters
down_block_types=(
"DownBlock2D", # a regular ResNet downsampling block
"DownBlock2D",
"AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention
"AttnDownBlock2D",
),
up_block_types=(
"AttnUpBlock2D",
"AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention
"UpBlock2D",
"UpBlock2D", # a regular ResNet upsampling block
),
)
model.to(device);
当在处理更高分辨率的输入时,你可能想用更多层的下、上采样模块,让注意力层只聚焦在最低分辨率(最底)层来减少内存消耗。我们在之后会讨论该如何实验来找到最适用与你手头场景的配置方法。
我们可以通过输入一批数据和随机的迭代周期数来看输出是否与输入尺寸相同:
with torch.no_grad():
model_prediction = model(noisy_xb, timesteps).sample
model_prediction.shape
创建训练循环
终于可以训练了!下面这是 PyTorch 中经典的优化迭代循环,在这里一批一批的送入数据然后通过优化器来一步步更新模型参数 - 在这个样例中我们使用学习率为 0.0004 的 AdamW 优化器。
对于每一批的数据,我们要
- 随机取样几个迭代周期
- 根据预设为数据加入噪声
- 把带噪数据送入模型
- 使用 MSE 作为损失函数来比较目标结果与模型预测结果(在这里是加入噪声的场景)
- 通过
loss.backward ()
与optimizer.step ()
来更新模型参数
在这个过程中我们记录 Loss 值用来后续的绘图。
NB: 这段代码大概需 10 分钟来运行 - 你也可以跳过以下两块操作直接使用预训练好的模型。供你选择,你可以探索下通过缩小模型层中的通道数会对运行速度有多少提升。
# Set the noise scheduler
noise_scheduler = DDPMScheduler(
num_train_timesteps=1000, beta_schedule="squaredcos_cap_v2"
)
# Training loop
optimizer = torch.optim.AdamW(model.parameters(), lr=4e-4)
losses = []
for epoch in range(30):
for step, batch in enumerate(train_dataloader):
clean_images = batch["images"].to(device)
# Sample noise to add to the images
noise = torch.randn(clean_images.shape).to(clean_images.device)
bs = clean_images.shape[0]
# Sample a random timestep for each image
timesteps = torch.randint(
0, noise_scheduler.num_train_timesteps, (bs,), device=clean_images.device
).long()
# Add noise to the clean images according to the noise magnitude at each timestep
noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)
# Get the model prediction
noise_pred = model(noisy_images, timesteps, return_dict=False)[0]
# Calculate the loss
loss = F.mse_loss(noise_pred, noise)
loss.backward(loss)
losses.append(loss.item())
# Update the model parameters with the optimizer
optimizer.step()
optimizer.zero_grad()
if (epoch + 1) % 5 == 0:
loss_last_epoch = sum(losses[-len(train_dataloader) :]) / len(train_dataloader)
print(f"Epoch:{epoch+1}, loss: {loss_last_epoch}")
绘制 loss 曲线,我们能看到模型在一开始快速的收敛,接下来以一个较慢的速度持续优化(我们用右边 log 坐标轴的视图可以看的更清楚):
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].plot(losses)
axs[1].plot(np.log(losses))
plt.show()
生成图像
方法 1:建立一个管道:
from diffusers import DDPMPipeline
image_pipe = DDPMPipeline(unet=model, scheduler=noise_scheduler)
pipeline_output = image_pipe()
pipeline_output.images[0]
我们可以在本地文件夹这样保存一个管道:
image_pipe.save_pretrained("my_pipeline")
检查文件夹的内容:
!ls my_pipeline/
方法 2:写一个取样循环
从随机噪声开始,遍历管理器的迭代周期来看从最嘈杂直到最微小的噪声变化,基于模型的预测一步步减少一些噪声:
# Random starting point (8 random images):
sample = torch.randn(8, 3, 32, 32).to(device)
for i, t in enumerate(noise_scheduler.timesteps):
# Get model pred
with torch.no_grad():
residual = model(sample, t).sample
# Update sample with step
sample = noise_scheduler.step(residual, t, sample).prev_sample
show_images(sample)
把你的模型 Push 到 Hub
在上面的例子中我们把管道保存在了本地。把模型 push 到 hub 上,我们会需要建立模型和相应文件的仓库名。我们根据你的选择(模型 ID)来决定仓库的名字(大胆的去替换掉model_name
吧;需要包含你的用户名,get_full_repo_name ()
会帮你做到):
from huggingface_hub import get_full_repo_name
model_name = "zhzhzhzhzhz-butterflies-32"
hub_model_id = get_full_repo_name(model_name)
hub_model_id
然后,在 🤗 Hub 上创建模型仓库并 push 它吧:
from huggingface_hub import HfApi, create_repo
create_repo(hub_model_id)
api = HfApi()
api.upload_folder(
folder_path="my_pipeline/scheduler", path_in_repo="", repo_id=hub_model_id
)
api.upload_folder(folder_path="my_pipeline/unet", path_in_repo="", repo_id=hub_model_id)
api.upload_file(
path_or_fileobj="my_pipeline/model_index.json",
path_in_repo="model_index.json",
repo_id=hub_model_id,
)
最后一件事是创建一个超棒的模型卡,如此,我们的蝴蝶生成器可以轻松的在 Hub 上被找到(请在描述中随意发挥!):
from huggingface_hub import ModelCard
content = f"""
---
license: mit
tags:
- pytorch
- diffusers
- unconditional-image-generation
- diffusion-models-class
---
# Model Card for Unit 1 of the [Diffusion Models Class 🧨](https://github.com/huggingface/diffusion-models-class)
This model is a diffusion model for unconditional image generation of cute 🦋.
## Usage
```python
from diffusers import DDPMPipeline
pipeline = DDPMPipeline.from_pretrained('{hub_model_id}')
image = pipeline().images[0]
image
```
"""
card = ModelCard(content)
card.push_to_hub(hub_model_id)
现在模型已经在 Hub 上了,你可以这样从任何地方使用DDPMPipeline
的from_pretrained ()
方法来下来它:
from diffusers import DDPMPipeline
hub_model_id = "https://huggingface.co/zhzhzhzhzhz/zhzhzhzhzhz-butterflies-32"
image_pipe = DDPMPipeline.from_pretrained(hub_model_id)
pipeline_output = image_pipe()
pipeline_output.images[0]