Some Strategies for Using Multiple Nodes of GPUs¶
Using multiple GPUs is one option to speed up your code. On Apocrita, we have V100, A100 and H100 GPUs available, with up to 4 GPUs per node. On other compute clusters, JADE2 has 8 V100 GPUs per node and Sulis has 3 A100 GPUs per node. If your problem is pleasingly parallel, you can distribute identical or similar tasks to each GPU on a node, or even on multiple nodes.
In Python, there are libraries which can help you use multiple GPUs on one node
or multiple nodes. The library
multiprocessing
,
covered in another blog
post, can be used to
create multiple processes within a node. Each of those processes can utilise a
GPU. For multiple nodes, mpi4py
can run
copies of your code, with different parameters, on different nodes. After
running your codes, you can use mpi4py
to gather results from the different
nodes to be processed. It works well with schedulers such
as UGE, used here on Apocrita, and Slurm, used on JADE2 and Sulis.
In this blog, we will demonstrate utilising multiple GPUs using
multiprocessing
and mpi4py
. These were done on Apocrita and Sulis using
multiple GPUs and also multiple nodes of GPUs. Rather than
a walkthrough, this blog
will cover some general strategies and tips when using multiprocessing
and
mpi4py
together.
There are a few ways to use multiple GPUs and we will discuss them here.
Array Jobs¶
The easiest way to use multiple GPUs is to run an array job. This allows you to run multiple independent jobs when resources become available, not necessarily simultaneously. This is very beginner-friendly and requires minimal changes to your code. However, this only works if your problem can be split into independent jobs. In addition, you will have to write and run another job script to process the results from the array job when it finishes.
multiprocessing
¶
You can use multiprocessing
to spawn new Python processes which can run in
parallel within a node.
A common way to use multiprocessing
is to use Pool.map()
, this can spawn new
processes each running the same function but with different parameters. In the
example below, we create a Pool
to call say_hello()
with different names.
import multiprocessing
def say_hello(name):
print(f"Hello {name}")
if __name__ == "__main__":
names = [
"Barry",
"Alice",
"Barbara",
"Tom",
]
with multiprocessing.Pool(len(names)) as pool:
pool.map(say_hello, names)
In a similar spirit, if you have multiple GPUs, you can use Pool.map()
to use
those GPUs at the same time. In the example below, we use all detectable GPUs
simultaneously and put the number 12345
in its memory .
import multiprocessing
import cupy
from cupy import cuda
def use_gpu(gpu_id):
with cuda.Device(gpu_id):
cupy.asarray(12345)
print("Using GPU {gpu_id}")
if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")
gpu_ids = range(cuda.runtime.getDeviceCount())
with multiprocessing.Pool(len(gpu_ids)) as pool:
pool.map(use_gpu, gpu_ids)
The function cuda.runtime.getDeviceCount()
returns the number of GPUs
detected. We then spawn that many number of processes, each calling the function
use_gpu()
.
Start methods
Using multiprocessing
to use multiple GPUs cannot work when forking
processes. You must either spawn processes or create a fork server. This
can be done respectively
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
and
if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")
where if __name__ == "__main__":
ensures only the main or parent process
sets multiprocessing
's behaviour. We do not want to give Python an idea
for its spawned processes to spawn even more processes. There is more
documentation
on the different start methods.
I personally prefer to create a subclass of multiprocessing.Process
and
override the run()
method. This has the advantage of using a class structure
to represent each process. You can assign all sorts of objects to its member
variables, for example,
Queue
s
and
Pipe
s
to allow communication between processes. Below is the same example but using
multiprocessing.Process
.
import multiprocessing
import cupy
from cupy import cuda
class Worker(multiprocessing.Process):
def __init__(self, gpu_id):
super().__init__()
self.gpu_id = gpu_id
def run(self):
with cuda.Device(self.gpu_id):
cupy.asarray(12345)
print(f"Using GPU {self.gpu_id}")
if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")
gpu_ids = range(cuda.runtime.getDeviceCount())
workers = [Worker(gpu_id) for gpu_id in gpu_ids]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
mpi4py
¶
MPI (message passing interface) is a way to run multiple copies of your code simultaneously. Each copy, or MPI process, is assigned a rank to uniquely identify each MPI process. Each MPI process can be run within a node and/or on multiple nodes and can communicate with each other.
On Apocrita, when using multiple nodes, this is known as a parallel job.
Back to the say_hello()
example, we provide an example below called
mpi_example.py
.
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
def say_hello(name):
print(f"Hello {name}")
if __name__ == "__main__":
names = [
"Barry",
"Alice",
"Barbara",
"Tom",
]
say_hello(names[RANK])
Running python mpi_example.py
will produce
Hello Barry
What happened here is in python mpi_example.py
, only one process is called
with RANK = 0
. Thus only Barry was greeted.
To invoke multiple MPI processes on your personal computer, use mpirun
with
the option -n
with the number of processes you want, for example
mpirun -n 4 python mpi_example.py
This may produce the output
Hello Alice
Hello Barbara
Hello Barry
Hello Tom
In this example, there are 4 MPI processes runningmpi_example.py
. Each MPI
process has a different value of RANK
, enabling each process to greet a
different person.
MPI becomes very useful should you wish to use multiple nodes of GPUs. But it's also useful if you want to write code, in this MPI framework, where each process only uses one GPU. This can then scale up to multiple GPUs on multiple nodes where you run an MPI process for each GPU.
For MPI processes to communicate with each other, mpi4py
provides some very
useful functions, for example,
gather()
to collate results from all MPI processes.
We will cover two ways MPI processes can be distributed across nodes:
- Create an MPI process for each GPU
- Create an MPI process for each node
This will give some hints and ideas on how to use multiple nodes of GPUs. We will demonstrate using Apocrita and Sulis, which have nodes of GPUs connected with an InfiniBand.
MPI Process for Each GPU ("Pure" MPI approach)¶
Sulis and Slurm¶
When deploying an MPI process for each GPU on multiple nodes, we can write code
which works out which GPU to use on a node given a RANK
. Here's an example
script called mpi_per_gpu.py
import platform
import cupy
from cupy import cuda
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
def use_gpu(gpu_id):
with cuda.Device(gpu_id):
cupy.asarray(12345)
print(f"Rank {RANK} on {platform.node()} using GPU {gpu_id}")
if __name__ == "__main__":
gpu_id = RANK % cuda.runtime.getDeviceCount()
use_gpu(gpu_id)
In this example code, the function use_gpu()
allocates memory to a specified
GPU, prints the node's name and the MPI process' RANK
. This is to confirm how
the MPI processes are distributed across multiple nodes.
On Sulis, Slurm allocates MPI processes using the fill-up rule. This is where
the scheduler will allocate a process for each GPU on one node first before
moving to the next one. Thus gpu_id = RANK % cuda.runtime.getDeviceCount()
should assign gpu_id
a number between 0 inclusive and
cuda.runtime.getDeviceCount()
exclusive on each node.
Unlike our personal computers, Slurm allocates resources to each MPI process. In order to allocate a process per GPU, let's take a look at the GPU node specifications of Sulis. Each node has
- 2 x AMD EPYC 7742 (Rome) 2.25 GHz 64-core processors (i.e. 128 cores per node)
- 512 GB DDR4-3200 RAM per node (i.e. 4 GB per core)
- 3 x NVIDIA A100 40 GB RAM passively-cooled
Each node has 3 GPUs and we want an MPI process per GPU. Following from this, we could suggest the following allocations:
- 3 processes (or tasks) per node
- 42 CPU cores per process
- 3.85 GB per CPU core
These allocations are to be provided in the job script, example below, which requests 2 nodes
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=3
#SBATCH --cpus-per-task=42
#SBATCH --mem-per-cpu=3850
#SBATCH --gres=gpu:ampere_a100:3
#SBATCH --partition=gpu
#SBATCH --time=00:10:00
#SBATCH --account=su008
module purge
module load GCC/10.2.0
module load CUDA/11.1.1
module load OpenMPI/4.0.5
module load CuPy/8.5.0
srun python mpi_per_gpu.py
where srun
is the replacement for mpirun
when using Slurm. An example output
of this is
Rank 3 on gpu009.sulis.hpc using GPU 0
Rank 1 on gpu008.sulis.hpc using GPU 1
Rank 4 on gpu009.sulis.hpc using GPU 1
Rank 5 on gpu009.sulis.hpc using GPU 2
Rank 0 on gpu008.sulis.hpc using GPU 0
Rank 2 on gpu008.sulis.hpc using GPU 2
which confirms that there were 3 MPI processes per node, each MPI process using
a GPU. We observe the fill-up allocation happening here because the first three
ranks are on gpu008.sulis.hpc
and the remaining on gpu009.sulis.hpc
.
Apocrita and UGE¶
On Apocrita, to request multiple nodes of GPUs, you use -pe parallel
. However,
there isn't a one-to-one equivalent of Slurm's --ntasks-per-node
on UGE.
Apocrita's -pe parallel
will default to an MPI process per CPU core using the
fill-up allocation rule.
To get around this, we suggest using intelmpi
's round-robin allocation rule.
This type of allocation allocates the MPI processes evenly across nodes. We can
adjust the script mpi_per_gpu.py
accordingly by changing gpu_id
import platform
import cupy
from cupy import cuda
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
SIZE = COMM.Get_size()
def use_gpu(gpu_id):
with cuda.Device(gpu_id):
cupy.asarray(12345)
print(f"Rank {RANK} on {platform.node()} using GPU {gpu_id}")
if __name__ == "__main__":
n_node = SIZE / cuda.runtime.getDeviceCount()
gpu_id = RANK // int(n_node)
use_gpu(gpu_id)
where we want SIZE
to be the total number of GPUs across nodes.
Here's an example bash script which requests 2 nodes of 4 GPUs to run this code on.
#!/bin/bash
#$ -l h_rt=240:0:0
#$ -pe parallel 96
#$ -l gpu=4
#$ -l gpu_type=ampere
#$ -cwd
#$ -j y
#$ -q test.q
module load intelmpi/2022.2
module load cuda/12.0.0
module load python/3.10.7
source ../venv/bin/activate
mpirun -rr -n 8 python mpi_per_gpu.py
Do note the following:
-pe parallel 96
requests 96 CPU cores, this is equivalent to two nodes-l gpu=4
requests 4 GPUs per nodevenv
is some virtual environment containing the python packages we need- With
intelmpi
,mpirun
has the option-rr
to use round-robin allocation option mpirun
has the option-n 8
to allocate 8 MPI processes
The job's resulting output may look something like this
Rank 1 on rdg12 using GPU 0
Rank 0 on rdg13 using GPU 0
Rank 2 on rdg13 using GPU 1
Rank 6 on rdg13 using GPU 3
Rank 4 on rdg13 using GPU 2
Rank 3 on rdg12 using GPU 1
Rank 5 on rdg12 using GPU 2
Rank 7 on rdg12 using GPU 3
which shows there is a unique rank for each node-GPU combination, meaning there
is an MPI process for each GPU. The round-robin allocation is verified by
noticing that the even ranks are on rdg13
and the odd ranks are on rdg12
.
MPI Process for Each Node (Hybrid Approach)¶
Another option is to allocate an MPI process per node. Each MPI process uses
multiprocessing
to utilise all GPUs on a node. This is also known as a hybrid
approach. This works because MPI runs copies of your code, including code which
uses multiple processes or threads.
MPI forking processes
An MPI process cannot fork more processes. Using
multiprocessing.set_start_method("fork")
will upset mpi4py
. Please use
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
or
if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")
as explained previously.
Below is an example Python code, called mpi_per_node.py
which uses all GPUs on
a node, prints the node's name and the MPI process' RANK
.
import multiprocessing
import platform
import cupy
from cupy import cuda
if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
class Worker(multiprocessing.Process):
def __init__(self, gpu_id, rank):
super().__init__()
self.gpu_id = gpu_id
self.rank = rank
def run(self):
with cuda.Device(self.gpu_id):
cupy.asarray(12345)
print(f"Rank {self.rank} on {platform.node()} "
f"using GPU {self.gpu_id}")
if __name__ == "__main__":
gpu_ids = range(cuda.runtime.getDeviceCount())
workers = [Worker(gpu_id, RANK) for gpu_id in gpu_ids]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
Import guard
The import statement from mpi4py import MPI
will trigger initialisation
behind the scenes for MPI to work. We have to ensure this import statement
is only done by the parent process. Thus any processes created by
multiprocessing
should not use any mpi4py
code, including importing it.
To do this, I suggest an import guard, shown below, to ensure only the
parent process imports mpi4py
.
if __name__ == "__main__":
from mpi4py import MPI
We placed the guard at the top of the code so that import statements remain at the top of the code.
The above example code is very similar to the example code shown in the
multiprocessing
example.
Continuing with Sulis and Slurm, we can allocate one entire node per MPI process:
- 1 process (or tasks) per node
- 128 CPU cores per process
- 3.85 GB per CPU core
This can be put in a job script and requesting, for example, 2 nodes.
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=128
#SBATCH --mem-per-cpu=3850
#SBATCH --gres=gpu:ampere_a100:3
#SBATCH --partition=gpu
#SBATCH --time=00:10:00
#SBATCH --account=su008
module purge
module load GCC/10.2.0
module load CUDA/11.1.1
module load OpenMPI/4.0.5
module load CuPy/8.5.0
srun python mpi_per_node.py
An example output of the job is
Rank 1 on gpu009.sulis.hpc using GPU 1
Rank 1 on gpu009.sulis.hpc using GPU 2
Rank 1 on gpu009.sulis.hpc using GPU 0
Rank 0 on gpu008.sulis.hpc using GPU 2
Rank 0 on gpu008.sulis.hpc using GPU 0
Rank 0 on gpu008.sulis.hpc using GPU 1
which verifies that there is a MPI process per node, each using all GPUs on the node.
Similarly, on Apocrita, the bash script may look something like this
#!/bin/bash
#$ -l h_rt=240:0:0
#$ -pe parallel 96
#$ -l gpu=4
#$ -l gpu_type=ampere
#$ -cwd
#$ -j y
#$ -q test.q
module load intelmpi/2022.2
module load cuda/12.0.0
module load python/3.10.7
source ../venv/bin/activate
mpirun -rr -n 2 python mpi_per_node.py
The script mpi_per_node.py
works both on Apocrita and Sulis without change, so
it is independent of the allocation rule.
Discussion¶
We've covered four methods to use multiple nodes of GPUs:
- Array jobs
multiprocessing
- MPI (using
mpi4py
with an MPI process per GPU) - Hybrid MPI+
multiprocessing
(usingmpi4py
with an MPI process per node)
I believe that there is a use case for these different methods, depending on whether your problem is more suited to be split per node or GPU. I also think it's sensible to choose a method based on your preferred style of programming.
For example, when writing code suitable for array jobs, you want your code to
run given a unique parameter for each job. On Apocrita, that unique parameter is
your $SGE_TASK_ID
. Similarly, for MPI, each MPI process has a unique RANK
and runs code given that RANK
. This could be described as encapsulating your
one process in one script. As we saw in the MPI example, the MPI approach seems
simpler. Given a RANK
, work out what GPU to use and use it. It is also easily
scalable to any number of GPUs and nodes you give it.
On the other hand with multiprocessing
and the hybrid approach, when writing
code, you are responsible for handling each process on a node in your code. This
can be unwieldy but very useful if you want more fine-grained control over your
processes. For example, if you want to run additional CPU processes while the
GPU processes are running.
The hybrid approach is also appropriate when you already have existing
multiprocessing
code and want to extend it to use multiple nodes. As we saw in
the example with multiprocessing
and hybrid code, they are very similar.
With multiprocessing
and the hybrid approach, you also have the opportunity to
work in a shared
memory
paradigm. However, with Python's
GIL, I personally find this
is quite rare and can be difficult to work with.
As we saw in the MPI example, do note that schedulers may allocate processes differently which may affect your code. The hybrid approach works well here as it is independent of the allocation rule.
There are many more ways to use multiple nodes of GPUs we haven't touched on,
for example, Dask and PyTorch
's
DistributedDataParallel
.
But I think the tools shown in this blog should be a good starting point to
assess what tools would be suitable for you to start using multiple nodes of
GPUs.