Skip to content

Commit

Permalink
Revert "Topo service (#75)" (#85)
Browse files Browse the repository at this point in the history
This reverts commit 764b601.
  • Loading branch information
BichengYing authored Mar 29, 2021
1 parent 764b601 commit dcf56ba
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 227 deletions.
44 changes: 7 additions & 37 deletions bluefog/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# limitations under the License.
# ==============================================================================

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import os
import torch
Expand All @@ -26,10 +30,10 @@
DistributedWinPutOptimizer,
DistributedAllreduceOptimizer,
DistributedNeighborAllreduceOptimizer,
DistributedHierarchicalNeighborAllreduceOptimizer,
DistributedHierarchicalNeighborAllreduceOptimizer
)

check_extension("bluefog.torch", __file__, "mpi_lib")
check_extension('bluefog.torch', __file__, 'mpi_lib')

from bluefog.torch.mpi_ops import init, shutdown
from bluefog.torch.mpi_ops import size, local_size, rank, local_rank
Expand Down Expand Up @@ -70,38 +74,4 @@

from bluefog.torch.mpi_ops import timeline_start_activity, timeline_end_activity
from bluefog.torch.mpi_ops import timeline_context

from bluefog.torch.utility import (
broadcast_optimizer_state,
broadcast_parameters,
allreduce_parameters,
)

from bluefog.common.topology_util import (
GetRecvWeights,
GetSendWeights,
IsRegularGraph,
IsTopologyEquivalent,
)

from bluefog.common.topology_util import (
ExponentialTwoGraph,
ExponentialGraph,
FullyConnectedGraph,
MeshGrid2DGraph,
RingGraph,
StarGraph,
SymmetricExponentialGraph,
)

from bluefog.common.topology_util import (
GetDynamicOnePeerSendRecvRanks,
GetExp2DynamicSendRecvMachineRanks,
GetInnerOuterRingDynamicSendRecvRanks,
GetInnerOuterExpo2DynamicSendRecvRanks,
)

from bluefog.torch.topology_util import (
InferSourceFromDestinationRanks,
InferDestinationFromSourceRanks,
)
from bluefog.torch.utility import broadcast_optimizer_state, broadcast_parameters, allreduce_parameters
106 changes: 0 additions & 106 deletions bluefog/torch/topology_util.py

This file was deleted.

2 changes: 0 additions & 2 deletions bluefog/torch/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# limitations under the License.
# ==============================================================================

from typing import Any, List, Optional
import collections

import numpy as np
import torch
import bluefog.torch as bf

Expand Down
102 changes: 20 additions & 82 deletions test/torch_basics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,8 @@

from common import mpi_env_rank_and_size
import bluefog.torch as bf
from bluefog.torch import (
ExponentialGraph,
RingGraph,
StarGraph,
MeshGrid2DGraph,
FullyConnectedGraph,
)
from bluefog.torch import (
IsTopologyEquivalent,
InferDestinationFromSourceRanks,
InferSourceFromDestinationRanks,
)
from bluefog.common.topology_util import ExponentialGraph, RingGraph, RingGraph
from bluefog.common.topology_util import IsTopologyEquivalent

warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")
Expand Down Expand Up @@ -85,12 +75,10 @@ def test_set_topology_fail_with_win_create(self):

if size == 1:
expected_topology = nx.from_numpy_array(
np.array([[0.5]]), create_using=nx.DiGraph
)
np.array([[0.5]]), create_using=nx.DiGraph)
elif size == 2:
expected_topology = nx.from_numpy_array(
np.array([[0, 0.2], [0.2, 0]]), create_using=nx.DiGraph
)
np.array([[0, 0.2], [0.2, 0]]), create_using=nx.DiGraph)
else:
expected_topology = RingGraph(size)

Expand All @@ -108,16 +96,10 @@ def test_set_and_load_topology(self):
bf.init()
size = bf.size()
if size == 4:
expected_topology = nx.DiGraph(
np.array(
[
[1 / 3.0, 1 / 3.0, 1 / 3.0, 0.0],
[0.0, 1 / 3.0, 1 / 3.0, 1 / 3.0],
[1 / 3.0, 0.0, 1 / 3.0, 1 / 3.0],
[1 / 3.0, 1 / 3.0, 0.0, 1 / 3.0],
]
)
)
expected_topology = nx.DiGraph(np.array(
[[1/3., 1/3., 1/3., 0.], [0., 1/3., 1/3., 1/3.],
[1/3., 0., 1/3., 1/3.], [1/3., 1/3., 0., 1/3.]]
))
elif size == 1:
expected_topology = nx.DiGraph(np.array([[1.0]]))
else:
Expand All @@ -131,81 +113,37 @@ def test_in_out_neighbors_expo2(self):
rank = bf.rank()
size = bf.size()
assert bf.set_topology(ExponentialGraph(size))
in_neighbors = bf.in_neighbor_ranks()
in_neighobrs = bf.in_neighbor_ranks()
out_neighbors = bf.out_neighbor_ranks()

degree = int(np.ceil(np.log2(size)))
expected_in_neighbors = sorted([(rank - 2 ** i) % size for i in range(degree)])
expected_out_neighbors = sorted([(rank + 2 ** i) % size for i in range(degree)])
assert sorted(in_neighbors) == expected_in_neighbors
expected_in_neighbors = sorted([(rank - 2**i) %
size for i in range(degree)])
expected_out_neighbors = sorted([(rank + 2**i) %
size for i in range(degree)])
assert sorted(in_neighobrs) == expected_in_neighbors
assert sorted(out_neighbors) == expected_out_neighbors

def test_in_out_neighbors_biring(self):
bf.init()
rank = bf.rank()
size = bf.size()
assert bf.set_topology(RingGraph(size))
in_neighbors = bf.in_neighbor_ranks()
in_neighobrs = bf.in_neighbor_ranks()
out_neighbors = bf.out_neighbor_ranks()

expected_in_neighbors = list(set(map(lambda x: x % size, [rank - 1, rank + 1])))
expected_out_neighbors = list(
set(map(lambda x: x % size, [rank - 1, rank + 1]))
)
expected_in_neighbors = list(set(
map(lambda x: x % size, [rank - 1, rank + 1])))
expected_out_neighbors = list(set(
map(lambda x: x % size, [rank - 1, rank + 1])))

if size <= 1:
expected_in_neighbors = []
expected_out_neighbors = []

assert sorted(in_neighbors) == expected_in_neighbors
assert sorted(in_neighobrs) == expected_in_neighbors
assert sorted(out_neighbors) == expected_out_neighbors


@pytest.mark.parametrize(
"topo_func",
[ExponentialGraph, RingGraph, StarGraph, MeshGrid2DGraph, FullyConnectedGraph],
)
def test_infer_destination_from_source_ranks(topo_func):
bf.init()
size = bf.size()
bf.set_topology(topo_func(size))
topo = bf.load_topology()
in_neighbors = bf.in_neighbor_ranks()
out_neighbors = bf.out_neighbor_ranks()

# Make the W into average rule.
expected_W = (nx.to_numpy_array(topo) > 0).astype(float)
expected_W /= expected_W.sum(axis=0)

src_ranks, W = InferDestinationFromSourceRanks(
src_ranks=in_neighbors, construct_adjacency_matrix=True
)
assert sorted(src_ranks) == out_neighbors
np.testing.assert_allclose(W, expected_W)


@pytest.mark.parametrize(
"topo_func",
[ExponentialGraph, RingGraph, StarGraph, MeshGrid2DGraph, FullyConnectedGraph],
)
def test_infer_source_from_destination_ranks(topo_func):
bf.init()
size = bf.size()
bf.set_topology(topo_func(size))
topo = bf.load_topology()
in_neighbors = bf.in_neighbor_ranks()
out_neighbors = bf.out_neighbor_ranks()

# Make the W into average rule.
expected_W = (nx.to_numpy_array(topo) > 0).astype(float)
expected_W /= expected_W.sum(axis=0)

dst_ranks, W = InferSourceFromDestinationRanks(
dst_ranks=out_neighbors, construct_adjacency_matrix=True
)
assert sorted(dst_ranks) == in_neighbors
np.testing.assert_allclose(W, expected_W)


if __name__ == "__main__":
unittest.main()

0 comments on commit dcf56ba

Please sign in to comment.