-
Notifications
You must be signed in to change notification settings - Fork 1
/
dist.py
112 lines (81 loc) · 2.78 KB
/
dist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
from typing import List
from typing import Union
import sys
import torch
import torch.distributed as tdist
import torch.multiprocessing as mp
__rank, __local_rank, __world_size, __device = 0, 0, 1, 'cpu'
__initialized = False
def initialized():
return __initialized
def initialize(backend='nccl'):
global __device
if not torch.cuda.is_available():
print(f'[dist initialize] cuda is not available, use cpu instead', file=sys.stderr)
return
elif 'RANK' not in os.environ:
__device = torch.empty(1).cuda().device
print(f'[dist initialize] RANK is not set, use 1 GPU instead', file=sys.stderr)
return
# ref: https://github.com/open-mmlab/mmcv/blob/master/mmcv/runner/dist_utils.py#L29
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method('spawn')
global_rank, num_gpus = int(os.environ['RANK']), torch.cuda.device_count()
local_rank = global_rank % num_gpus
torch.cuda.set_device(local_rank)
tdist.init_process_group(backend=backend)
global __rank, __local_rank, __world_size, __initialized
__local_rank = local_rank
__rank, __world_size = tdist.get_rank(), tdist.get_world_size()
__device = torch.empty(1).cuda().device
__initialized = True
assert tdist.is_initialized(), 'torch.distributed is not initialized!'
def get_rank():
return __rank
def get_local_rank():
return __local_rank
def get_world_size():
return __world_size
def get_device():
return __device
def is_master():
return __rank == 0
def is_local_master():
return __local_rank == 0
def barrier():
if __initialized:
tdist.barrier()
def parallelize(net, syncbn=False):
if syncbn:
net = torch.nn.SyncBatchNorm.convert_sync_batchnorm(net)
net = net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[get_local_rank()], find_unused_parameters=False, broadcast_buffers=False)
return net
def allreduce(t: torch.Tensor) -> None:
if __initialized:
if not t.is_cuda:
cu = t.detach().cuda()
tdist.all_reduce(cu)
t.copy_(cu.cpu())
else:
tdist.all_reduce(t)
def allgather(t: torch.Tensor, cat=True) -> Union[List[torch.Tensor], torch.Tensor]:
if __initialized:
if not t.is_cuda:
t = t.cuda()
ls = [torch.empty_like(t) for _ in range(__world_size)]
tdist.all_gather(ls, t)
else:
ls = [t]
if cat:
ls = torch.cat(ls, dim=0)
return ls
def broadcast(t: torch.Tensor, src_rank) -> None:
if __initialized:
if not t.is_cuda:
cu = t.detach().cuda()
tdist.broadcast(cu, src=src_rank)
t.copy_(cu.cpu())
else:
tdist.broadcast(t, src=src_rank)