Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cuda to multigpu (xpu) bench #8386

Merged
merged 14 commits into from
Nov 16, 2023
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added support for `torch.compile` in `MultiAggregation` ([#8345](https://github.com/pyg-team/pytorch_geometric/pull/8345))
- Added support for `torch.compile` in `HeteroConv` ([#8344](https://github.com/pyg-team/pytorch_geometric/pull/8344))
- Added support for weighted `sparse_cross_entropy` ([#8340](https://github.com/pyg-team/pytorch_geometric/pull/8340))
- Added a multi GPU training benchmarks for XPU device ([#8288](https://github.com/pyg-team/pytorch_geometric/pull/8288))
- Added a multi GPU training benchmarks for CUDA and XPU devices ([#8288](https://github.com/pyg-team/pytorch_geometric/pull/8288), [#8386](https://github.com/pyg-team/pytorch_geometric/pull/8386))
- Support MRR computation in `KGEModel.test()` ([#8298](https://github.com/pyg-team/pytorch_geometric/pull/8298))
- Added an example for model parallelism (`examples/multi_gpu/model_parallel.py`) ([#8309](https://github.com/pyg-team/pytorch_geometric/pull/8309))
- Added a tutorial for multi-node multi-GPU training with pure PyTorch ([#8071](https://github.com/pyg-team/pytorch_geometric/pull/8071))
Expand Down
17 changes: 11 additions & 6 deletions benchmark/multi_gpu/training/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# Training Benchmark

## Environment setup
## Running benchmark on CUDA GPU

Optional, XPU only:
Run benchmark, e.g. assuming you have `n` NVIDIA GPUs:
```
python training_benchmark_cuda.py --dataset ogbn-products --model edge_cnn --num-epochs 3 --n_gpus <n>
```

## Running benchmark on Intel GPU

## Environment setup
```
install intel_extension_for_pytorch
install oneccl_bindings_for_pytorch
```

## Running benchmark

Run benchmark, e.g. assuming you have 2 GPUs:
Run benchmark, e.g. assuming you have `n` XPUs:
```
mpirun -np 2 python training_benchmark.py --dataset ogbn-products --model edge_cnn --num-epochs 3
mpirun -np <n> python training_benchmark_xpu.py --dataset ogbn-products --model edge_cnn --num-epochs 3
```
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import argparse
import ast
import os
from time import perf_counter
from typing import Any, Tuple, Union
from typing import Any, Callable, Tuple, Union

import intel_extension_for_pytorch as ipex
import oneccl_bindings_for_pytorch # noqa
import torch
import torch.distributed as dist
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel as DDP

from benchmark.utils import get_dataset, get_model, get_split_masks, test
from benchmark.utils import get_model, get_split_masks, test
from torch_geometric.data import Data, HeteroData
from torch_geometric.loader import NeighborLoader
from torch_geometric.nn import PNAConv

Expand All @@ -24,6 +22,7 @@

device_conditions = {
'xpu': (lambda: torch.xpu.is_available()),
'cuda': (lambda: torch.cuda.is_available()),
}


Expand Down Expand Up @@ -63,6 +62,8 @@ def train_hetero(model: Any, loader: NeighborLoader,
def maybe_synchronize(device: str):
if device == 'xpu' and torch.xpu.is_available():
torch.xpu.synchronize()
if device == 'cuda' and torch.cuda.is_available():
torch.cuda.synchronize()


def create_mask_per_rank(
Expand All @@ -83,7 +84,9 @@ def create_mask_per_rank(
return mask_per_rank


def run(rank: int, world_size: int, args: argparse.ArgumentParser):
def run(rank: int, world_size: int, args: argparse.ArgumentParser,
num_classes: int, data: Union[Data, HeteroData],
custom_optimizer: Callable[[Any, Any], Tuple[Any, Any]] = None):
if not device_conditions[args.device]():
raise RuntimeError(f'{args.device.upper()} is not available')

Expand All @@ -92,13 +95,8 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):
if rank == 0:
print('BENCHMARK STARTS')
print(f'Running on {args.device.upper()}')

assert args.dataset in supported_sets.keys(
), f"Dataset {args.dataset} isn't supported."
if rank == 0:
print(f'Dataset: {args.dataset}')

data, num_classes = get_dataset(args.dataset, args.root)
hetero = True if args.dataset == 'ogbn-mag' else False
mask, val_mask, test_mask = get_split_masks(data, args.dataset)
mask = create_mask_per_rank(mask, rank, world_size, hetero)
Expand Down Expand Up @@ -192,8 +190,8 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

if args.device == 'xpu':
model, optimizer = ipex.optimize(model, optimizer=optimizer)
if custom_optimizer:
model, optimizer = custom_optimizer(model, optimizer)

train = train_hetero if hetero else train_homo

Expand Down Expand Up @@ -248,37 +246,11 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):
dist.destroy_process_group()


def get_dist_params() -> Tuple[int, int, str]:
master_addr = "127.0.0.1"
master_port = "29500"
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port

mpi_rank = int(os.environ.get("PMI_RANK", -1))
mpi_world_size = int(os.environ.get("PMI_SIZE", -1))
rank = mpi_rank if mpi_world_size > 0 else os.environ.get("RANK", 0)
world_size = (mpi_world_size if mpi_world_size > 0 else os.environ.get(
"WORLD_SIZE", 1))

os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

init_method = f"tcp://{master_addr}:{master_port}"

return rank, world_size, init_method


if __name__ == '__main__':
rank, world_size, init_method = get_dist_params()
dist.init_process_group(backend="ccl", init_method=init_method,
world_size=world_size, rank=rank)

def get_predefined_args() -> argparse.ArgumentParser:
argparser = argparse.ArgumentParser(
'GNN distributed (DDP) training benchmark')
add = argparser.add_argument

add('--device', choices=['xpu'], default='xpu',
help='Device to run benchmark on')
add('--dataset', choices=['ogbn-mag', 'ogbn-products', 'Reddit'],
default='Reddit', type=str)
add('--model',
Expand All @@ -297,6 +269,4 @@ def get_dist_params() -> Tuple[int, int, str]:
add('--num-epochs', default=1, type=int)
add('--evaluate', action='store_true')

args = argparser.parse_args()

run(rank, world_size, args)
return argparser
51 changes: 51 additions & 0 deletions benchmark/multi_gpu/training/training_benchmark_cuda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import argparse
import os
from typing import Union

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

from benchmark.multi_gpu.training.common import (
get_predefined_args,
run,
supported_sets,
)
from benchmark.utils import get_dataset
from torch_geometric.data import Data, HeteroData


def run_cuda(rank: int, world_size: int, args: argparse.ArgumentParser,
num_classes: int, data: Union[Data, HeteroData]):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
run(rank, world_size, args, num_classes, data)


if __name__ == '__main__':
argparser = get_predefined_args()
argparser.add_argument('--n-gpus', default=1, type=int)
args = argparser.parse_args()
setattr(args, 'device', 'cuda')

assert args.dataset in supported_sets.keys(), \
f"Dataset {args.dataset} isn't supported."
data, num_classes = get_dataset(args.dataset, args.root)

max_world_size = torch.cuda.device_count()
chosen_world_size = args.n_gpus
if chosen_world_size <= max_world_size:
world_size = chosen_world_size
else:
print(f'User selected {chosen_world_size} GPUs '
f'but only {max_world_size} GPUs are available')
world_size = max_world_size
print(f'Let\'s use {world_size} GPUs!')

mp.spawn(
run_cuda,
args=(world_size, args, num_classes, data),
nprocs=world_size,
join=True,
)
53 changes: 53 additions & 0 deletions benchmark/multi_gpu/training/training_benchmark_xpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
from typing import Any, Tuple

import intel_extension_for_pytorch as ipex
import oneccl_bindings_for_pytorch # noqa
import torch.distributed as dist

from benchmark.multi_gpu.training.common import (
get_predefined_args,
run,
supported_sets,
)
from benchmark.utils import get_dataset


def get_dist_params() -> Tuple[int, int, str]:
master_addr = "127.0.0.1"
master_port = "29500"
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port

mpi_rank = int(os.environ.get("PMI_RANK", -1))
mpi_world_size = int(os.environ.get("PMI_SIZE", -1))
rank = mpi_rank if mpi_world_size > 0 else os.environ.get("RANK", 0)
world_size = (mpi_world_size if mpi_world_size > 0 else os.environ.get(
"WORLD_SIZE", 1))

os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

init_method = f"tcp://{master_addr}:{master_port}"

return rank, world_size, init_method


def custom_optimizer(model: Any, optimizer: Any) -> Tuple[Any, Any]:
return ipex.optimize(model, optimizer=optimizer)


if __name__ == '__main__':
rank, world_size, init_method = get_dist_params()
dist.init_process_group(backend="ccl", init_method=init_method,
world_size=world_size, rank=rank)

argparser = get_predefined_args()
args = argparser.parse_args()
setattr(args, 'device', 'xpu')

assert args.dataset in supported_sets.keys(), \
f"Dataset {args.dataset} isn't supported."
data, num_classes = get_dataset(args.dataset, args.root)

run(rank, world_size, args, num_classes, data, custom_optimizer)
Loading