diff --git a/bluefog/torch/__init__.py b/bluefog/torch/__init__.py index eb8a1d08..62695e4f 100644 --- a/bluefog/torch/__init__.py +++ b/bluefog/torch/__init__.py @@ -14,6 +14,10 @@ # limitations under the License. # ============================================================================== +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + import collections import os import torch @@ -26,10 +30,10 @@ DistributedWinPutOptimizer, DistributedAllreduceOptimizer, DistributedNeighborAllreduceOptimizer, - DistributedHierarchicalNeighborAllreduceOptimizer, + DistributedHierarchicalNeighborAllreduceOptimizer ) -check_extension("bluefog.torch", __file__, "mpi_lib") +check_extension('bluefog.torch', __file__, 'mpi_lib') from bluefog.torch.mpi_ops import init, shutdown from bluefog.torch.mpi_ops import size, local_size, rank, local_rank @@ -70,38 +74,4 @@ from bluefog.torch.mpi_ops import timeline_start_activity, timeline_end_activity from bluefog.torch.mpi_ops import timeline_context - -from bluefog.torch.utility import ( - broadcast_optimizer_state, - broadcast_parameters, - allreduce_parameters, -) - -from bluefog.common.topology_util import ( - GetRecvWeights, - GetSendWeights, - IsRegularGraph, - IsTopologyEquivalent, -) - -from bluefog.common.topology_util import ( - ExponentialTwoGraph, - ExponentialGraph, - FullyConnectedGraph, - MeshGrid2DGraph, - RingGraph, - StarGraph, - SymmetricExponentialGraph, -) - -from bluefog.common.topology_util import ( - GetDynamicOnePeerSendRecvRanks, - GetExp2DynamicSendRecvMachineRanks, - GetInnerOuterRingDynamicSendRecvRanks, - GetInnerOuterExpo2DynamicSendRecvRanks, -) - -from bluefog.torch.topology_util import ( - InferSourceFromDestinationRanks, - InferDestinationFromSourceRanks, -) +from bluefog.torch.utility import broadcast_optimizer_state, broadcast_parameters, allreduce_parameters diff --git a/bluefog/torch/topology_util.py b/bluefog/torch/topology_util.py deleted file mode 100644 index c8bff26a..00000000 --- a/bluefog/torch/topology_util.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import Any, List, Optional, Tuple, Union -import collections - -import numpy as np -import torch -import bluefog.torch as bf - - -def _check_ranks(rank_list: List[Any], self_rank: int, size: int) -> [bool, str]: - for rank in rank_list: - if not isinstance(rank, int): - return False, "contain element that is not integer." - if (rank < 0) or (rank >= size): - return False, "contain element that is not between 0 and size-1." - if len(set(rank_list)) != len(rank_list): - return False, "contain duplicated elements." - if self_rank in rank_list: - return False, "contain self rank." - return True, "" - - -def InferSourceFromDestinationRanks( - dst_ranks: List[int], construct_adjacency_matrix: bool = False, -) -> Union[List[int], Tuple[List[int], np.array]]: - """Infer the source ranks from destination ranks. This is collective communication call. - - Args: - dst_ranks: A list of destination ranks. - construct_adjacency_matrix: If true, adjacency matrix will be return instead. - Element w_{ij} represents the weights sending from node i to node j. - We use column normalized style, i.e. the sum of receiving weight is 1. - - Raises: - ValueError: If dst_ranks or src_ranks does not contain integer from 0 to size-1. - - Returns: - If construct_adjacency_matrix is false, returns the source ranks list. - If construct_adjacency_matrix is true, returns the the source ranks list - and a 2-D numpy array. - """ - is_valid, error_msg = _check_ranks(dst_ranks, bf.rank(), bf.size()) - assert is_valid, f"The format of dst_ranks is wrong: {error_msg}" - return _infer_topo( - dst_ranks, - transpose=False, - construct_adjacency_matrix=construct_adjacency_matrix, - ) - - -def InferDestinationFromSourceRanks( - src_ranks: List[int], construct_adjacency_matrix: bool = False, -) -> Union[List[int], np.array]: - """Infer the destination ranks from source ranks. This is collective communication call. - - Args: - src_ranks: A list of destination ranks. - construct_adjacency_matrix: If true, adjacency matrix will be return instead. - Element w_{ij} represents the weights sending from node i to node j. - We use column normalized style, i.e. the sum of receiving weight is 1. - - Raises: - ValueError: If dst_ranks or src_ranks does not contain integer from 0 to size-1. - - Returns: - If construct_adjacency_matrix is false, returns the destination ranks list. - If construct_adjacency_matrix is true, returns the the sodestinationrce ranks - list and a 2-D numpy array. - """ - is_valid, error_msg = _check_ranks(src_ranks, bf.rank(), bf.size()) - assert is_valid, f"The format of src_ranks is wrong: {error_msg}" - return _infer_topo( - src_ranks, - transpose=True, - construct_adjacency_matrix=construct_adjacency_matrix, - ) - - -def _infer_topo( - rank_list: List[int], transpose: bool, construct_adjacency_matrix: bool -): - degree = len(rank_list) - all_degree_list = bf.allgather(torch.tensor([degree], dtype=torch.int32)).numpy() - all_rank_list = bf.allgather(torch.tensor(rank_list, dtype=torch.int32)).numpy() - adjacency_dict = dict() - displacement = 0 - for i, degree in enumerate(all_degree_list): - adjacency_dict[i] = sorted(all_rank_list[displacement : displacement + degree]) - displacement += degree - - inv_adjacency_dict = collections.defaultdict(list) - for k, adj in adjacency_dict.items(): - for v in adj: - inv_adjacency_dict[v].append(k) - return_list = inv_adjacency_dict.get(bf.rank()) - - if not construct_adjacency_matrix: - return return_list - - # construct_adjacency_matrix - W = np.eye(bf.size()) - for k, adj in adjacency_dict.items(): - W[k, adj] = 1 - if transpose: - W = W.T - - return return_list, W / W.sum(axis=1) diff --git a/bluefog/torch/utility.py b/bluefog/torch/utility.py index 73337207..b4e05238 100644 --- a/bluefog/torch/utility.py +++ b/bluefog/torch/utility.py @@ -14,10 +14,8 @@ # limitations under the License. # ============================================================================== -from typing import Any, List, Optional import collections -import numpy as np import torch import bluefog.torch as bf diff --git a/test/torch_basics_test.py b/test/torch_basics_test.py index 77c6904e..4494f3d2 100644 --- a/test/torch_basics_test.py +++ b/test/torch_basics_test.py @@ -28,18 +28,8 @@ from common import mpi_env_rank_and_size import bluefog.torch as bf -from bluefog.torch import ( - ExponentialGraph, - RingGraph, - StarGraph, - MeshGrid2DGraph, - FullyConnectedGraph, -) -from bluefog.torch import ( - IsTopologyEquivalent, - InferDestinationFromSourceRanks, - InferSourceFromDestinationRanks, -) +from bluefog.common.topology_util import ExponentialGraph, RingGraph, RingGraph +from bluefog.common.topology_util import IsTopologyEquivalent warnings.filterwarnings("ignore", message="numpy.dtype size changed") warnings.filterwarnings("ignore", message="numpy.ufunc size changed") @@ -85,12 +75,10 @@ def test_set_topology_fail_with_win_create(self): if size == 1: expected_topology = nx.from_numpy_array( - np.array([[0.5]]), create_using=nx.DiGraph - ) + np.array([[0.5]]), create_using=nx.DiGraph) elif size == 2: expected_topology = nx.from_numpy_array( - np.array([[0, 0.2], [0.2, 0]]), create_using=nx.DiGraph - ) + np.array([[0, 0.2], [0.2, 0]]), create_using=nx.DiGraph) else: expected_topology = RingGraph(size) @@ -108,16 +96,10 @@ def test_set_and_load_topology(self): bf.init() size = bf.size() if size == 4: - expected_topology = nx.DiGraph( - np.array( - [ - [1 / 3.0, 1 / 3.0, 1 / 3.0, 0.0], - [0.0, 1 / 3.0, 1 / 3.0, 1 / 3.0], - [1 / 3.0, 0.0, 1 / 3.0, 1 / 3.0], - [1 / 3.0, 1 / 3.0, 0.0, 1 / 3.0], - ] - ) - ) + expected_topology = nx.DiGraph(np.array( + [[1/3., 1/3., 1/3., 0.], [0., 1/3., 1/3., 1/3.], + [1/3., 0., 1/3., 1/3.], [1/3., 1/3., 0., 1/3.]] + )) elif size == 1: expected_topology = nx.DiGraph(np.array([[1.0]])) else: @@ -131,13 +113,15 @@ def test_in_out_neighbors_expo2(self): rank = bf.rank() size = bf.size() assert bf.set_topology(ExponentialGraph(size)) - in_neighbors = bf.in_neighbor_ranks() + in_neighobrs = bf.in_neighbor_ranks() out_neighbors = bf.out_neighbor_ranks() degree = int(np.ceil(np.log2(size))) - expected_in_neighbors = sorted([(rank - 2 ** i) % size for i in range(degree)]) - expected_out_neighbors = sorted([(rank + 2 ** i) % size for i in range(degree)]) - assert sorted(in_neighbors) == expected_in_neighbors + expected_in_neighbors = sorted([(rank - 2**i) % + size for i in range(degree)]) + expected_out_neighbors = sorted([(rank + 2**i) % + size for i in range(degree)]) + assert sorted(in_neighobrs) == expected_in_neighbors assert sorted(out_neighbors) == expected_out_neighbors def test_in_out_neighbors_biring(self): @@ -145,67 +129,21 @@ def test_in_out_neighbors_biring(self): rank = bf.rank() size = bf.size() assert bf.set_topology(RingGraph(size)) - in_neighbors = bf.in_neighbor_ranks() + in_neighobrs = bf.in_neighbor_ranks() out_neighbors = bf.out_neighbor_ranks() - expected_in_neighbors = list(set(map(lambda x: x % size, [rank - 1, rank + 1]))) - expected_out_neighbors = list( - set(map(lambda x: x % size, [rank - 1, rank + 1])) - ) + expected_in_neighbors = list(set( + map(lambda x: x % size, [rank - 1, rank + 1]))) + expected_out_neighbors = list(set( + map(lambda x: x % size, [rank - 1, rank + 1]))) if size <= 1: expected_in_neighbors = [] expected_out_neighbors = [] - assert sorted(in_neighbors) == expected_in_neighbors + assert sorted(in_neighobrs) == expected_in_neighbors assert sorted(out_neighbors) == expected_out_neighbors -@pytest.mark.parametrize( - "topo_func", - [ExponentialGraph, RingGraph, StarGraph, MeshGrid2DGraph, FullyConnectedGraph], -) -def test_infer_destination_from_source_ranks(topo_func): - bf.init() - size = bf.size() - bf.set_topology(topo_func(size)) - topo = bf.load_topology() - in_neighbors = bf.in_neighbor_ranks() - out_neighbors = bf.out_neighbor_ranks() - - # Make the W into average rule. - expected_W = (nx.to_numpy_array(topo) > 0).astype(float) - expected_W /= expected_W.sum(axis=0) - - src_ranks, W = InferDestinationFromSourceRanks( - src_ranks=in_neighbors, construct_adjacency_matrix=True - ) - assert sorted(src_ranks) == out_neighbors - np.testing.assert_allclose(W, expected_W) - - -@pytest.mark.parametrize( - "topo_func", - [ExponentialGraph, RingGraph, StarGraph, MeshGrid2DGraph, FullyConnectedGraph], -) -def test_infer_source_from_destination_ranks(topo_func): - bf.init() - size = bf.size() - bf.set_topology(topo_func(size)) - topo = bf.load_topology() - in_neighbors = bf.in_neighbor_ranks() - out_neighbors = bf.out_neighbor_ranks() - - # Make the W into average rule. - expected_W = (nx.to_numpy_array(topo) > 0).astype(float) - expected_W /= expected_W.sum(axis=0) - - dst_ranks, W = InferSourceFromDestinationRanks( - dst_ranks=out_neighbors, construct_adjacency_matrix=True - ) - assert sorted(dst_ranks) == in_neighbors - np.testing.assert_allclose(W, expected_W) - - if __name__ == "__main__": unittest.main()