从零实现 LLM Training:003. Simple DDP
在实现了 mini-GPT 以及简单的 training loop 之后,我们可以上一些最简单的分布式训练,比如数据并行。数据并行本质上就是每张卡跑相同的模型,但是使用不同的数据,每张卡各自 forward backward 之后通过 all-reduce 来得到平均梯度。
为了简单起见,这里我们直接使用 pytorch 的数据并行,只需要额外添加一个 train_ddp.py 文件即可。
train_ddp.py
import os
import torch
import torch.distributed as dist
from config import GPTConfig
from model import GPTModel
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset, DistributedSampler
class ToyRandomDataset(Dataset):
def __init__(self, vocab_size: int, seq_len: int, num_samples: int):
self.vocab_size = vocab_size
self.seq_len = seq_len
self.num_samples = num_samples
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
input_ids = torch.randint(
low=0,
high=self.vocab_size,
size=(self.seq_len,),
dtype=torch.long,
)
labels = input_ids.clone()
attention_mask = torch.ones(self.seq_len, dtype=torch.long)
return {
"input_ids": input_ids,
"labels": labels,
"attention_mask": attention_mask,
}
def setup_distributed():
dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
return device, local_rank
def cleanup_distributed():
dist.destroy_process_group()
def is_main_process(local_rank: int) -> bool:
return local_rank == 0
def main():
device, local_rank = setup_distributed()
if is_main_process(local_rank):
print(f"[rank {local_rank}] Using device: {device}")
config = GPTConfig(
vocab_size=10000,
max_position_embeddings=128,
n_layers=2,
n_heads=4,
d_model=128,
d_ff=512,
dropout=0.1,
)
model = GPTModel(config).to(device)
ddp_model = DDP(
model,
device_ids=[device.index],
output_device=device.index,
find_unused_parameters=False,
)
dataset = ToyRandomDataset(
vocab_size=config.vocab_size,
seq_len=32,
num_samples=1000,
)
sampler = DistributedSampler(
dataset,
num_replicas=dist.get_world_size(),
rank=dist.get_rank(),
shuffle=True,
)
dataloader = DataLoader(
dataset,
batch_size=8,
sampler=sampler,
)
optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=3e-4)
ddp_model.train()
num_steps = 50
step = 0
for epoch in range(1, 1000):
sampler.set_epoch(epoch)
for batch in dataloader:
step += 1
if step > num_steps:
break
input_ids = batch["input_ids"].to(device) # [B, T]
labels = batch["labels"].to(device) # [B, T]
attention_mask = batch["attention_mask"].to(device) # [B, T]
optimizer.zero_grad()
logits, loss = ddp_model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
)
loss.backward()
optimizer.step()
if is_main_process(local_rank) and step % 10 == 0:
print(f"[step {step} / {num_steps}] loss = {loss.item():.4f}")
if step > num_steps:
break
if is_main_process(local_rank):
print("Training finished.")
cleanup_distributed()
if __name__ == "__main__":
main()
这里我们依然是使用一个随机的 Toy Dataset,每次生成一个长度为 seq_len 的 id 列表,然后 labels 暂时等于生成的 id 列表(为了简单起见)。
在 setup_distributed 的时候我们需要初始化分布式的环境,需要调用 dist.init_process_group(backend="nccl"),然后我们需要用 torchrun 来运行这个代码,torchrun 会自动给每张卡注入对应的 LOCAL_RANK 环境变量,所以我们可以通过 int(os.environ["LOCAL_RANK"]) 拿到对应的 local rank。
在构造完模型后,需要用 DDP 给包装一下,从而使用到数据并行的能力,实际的效果是每个 rank 拿一份自己的 model,放到对应的 GPU 上,DDP 则负责在 loss.backward() 的时候,让每个 rank 各自算梯度,用 all_reduce 把梯度求平均,让所有参数保持同步。
在构造数据集时,需要额外构造一个 DistributedSampler 来传给 DataLoader,他会把整个 dataset 切成 world size 份,每个 rank 只看到其中的一份。防止不同 GPU 重复使用同一批样本。其中还有一句 sampler.set_epoch(epoch) 是为了在每个 epoch 打乱时采用不同的随机种子。
剩下的逻辑就和之前单卡训练非常相似了。
运行
DDP 的运行需要 torchrun,命令如下:
$ torchrun --nproc-per-node=2 train_ddp.py
W1126 10:45:37.836000 2760123 site-packages/torch/distributed/run.py:792]
W1126 10:45:37.836000 2760123 site-packages/torch/distributed/run.py:792] *****************************************
W1126 10:45:37.836000 2760123 site-packages/torch/distributed/run.py:792] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
W1126 10:45:37.836000 2760123 site-packages/torch/distributed/run.py:792] *****************************************
[rank 0] Using device: cuda:0
[step 10 / 50] loss = 9.3437
[step 20 / 50] loss = 9.3408
[step 30 / 50] loss = 9.3145
[step 40 / 50] loss = 9.3891
[step 50 / 50] loss = 9.3715
Training finished.
其中 --nproc-per-node=2 表示每个节点有两张卡,然后默认情况下我们是一个节点(一台机器)。
这个执行之后,会启动两个进程(rank0, rank1),并给进程注入环境变量:
RANK表示全局 GPU 索引(多机多卡里每卡唯一)WORLD_SIZE表示总共有多少张卡LOCAL_RANK表示他是本机的第几张卡