Data-Parallel Training
The purpose of this tutorial is to demonstrate the structure of Pytorch code means to parallelize large sets of data across multiple GPUs for efficient training. We make use of the Pytorch Distributed Data Parallel (DDP) implementation to accomplish this task in this example.
First we import the necessary libraries:
import torch
import mlflow
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
Then we run the necessary DDP configuration:
def ddp_setup(rank, world_size):
"""
rank: Unique id of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
where “rank” is the unique identifier for each GPU/process, and “world_size” is the number of available GPUs where we will send each parallel process. The OS variables “MASTER_ADDR” and “MASTER_PORT” must also be set to establish communication amongst GPUs. The function defined here is standard and should work in most cases.
We can now define our NN class as usual:
class SeqNet(nn.Module):
def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
super(SeqNet, self).__init__()
self.lin1 = nn.Linear(input_size, hidden_size1)
self.lin2 = nn.Linear(hidden_size1, hidden_size2)
self.lin3 = nn.Linear(hidden_size2, output_size)
def forward(self, x):
x = torch.flatten(x,1)
x = self.lin1(x)
x = F.sigmoid(x)
x = self.lin2(x)
x = F.log_softmax(x, dim=1)
out = self.lin3(x)
return out
Next, a training function must be defined:
def train(model, train_loader, loss_function, optimizer, rank, num_epochs):
model.to(rank)
model = DDP(model, device_ids=[rank])
for epoch in range(num_epochs):
running_loss = 0.0
model.train()
for i ,(images,labels) in enumerate(train_loader):
images = torch.div(images, 255.)
images, labels = images.to(rank), labels.to(rank)
optimizer.zero_grad()
outputs = model(images)
loss = loss_function(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
average_loss = running_loss / len(train_loader)
if rank == 0:
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {average_loss:.4f}")
print("Training on GPU " + str(rank) + " finished.")
which involves the standard steps of training in a single-device case, but where our model must be wrapped in DDP by the model = DDP(model, device_ids=[rank])
directive.
It is also necessary to define a function to prepare our DataLoaders, which will handle the distribution of data across different processes/GPUs::
def prepare_dataloader(dataset, batch_size):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
Using DDP also required the explicit definition of a “main” function, as it will be called in different devices:
def main(rank, world_size):
ddp_setup(rank, world_size)
# Model and parameters
input_size = 784
hidden_size1 = 200
hidden_size2 = 200
output_size = 10
num_epochs = 10
batch_size = 100
lr = 0.01
my_net = SeqNet(input_size, hidden_size1, hidden_size2, output_size)
optimizer = torch.optim.Adam( my_net.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss()
fmnist_train = datasets.FashionMNIST(root="data", train=True, download=True, transform=ToTensor())
fmnist_train_loader = prepare_dataloader(fmnist_train, batch_size)
train(my_net, fmnist_train_loader, loss_function, optimizer, rank, num_epochs)
destroy_process_group()
Note that the clean-up function destroy_process_group()
must be called at the end of “main”.
We can now write the part of our code that will check for the number of available GPUs and distribute our “main” function, with its corresponding part of the data, to the appropriate GPU using mp.spawn()
.:
if __name__ == "__main__":
world_size = torch.cuda.device_count() # gets number of available GPUs
print("Number of available GPUs: " + str(world_size))
mp.spawn(main, args=(world_size,), nprocs=world_size)
Download the full script used in this example here