Skip to content

Commit

Permalink
kvstore.row_sparse_pull for GPU and end-to-end benchmark: CPU vs. mul…
Browse files Browse the repository at this point in the history
…ti-GPUs (apache#150)

* Add gpu support for BroadcastRowSparse

* Fix bugs

* Add benchmark script

* Increase output dim size

* Update weight on CPU using single GPU for sparse tensors

* More fix

* Optimize sparse_retain for special case

* Change row sparse pull locations

* Avoid sparse retain on cpu if possible

* Use acc for metric

* Fix misc
  • Loading branch information
reminisce authored and eric-haibin-lin committed Aug 15, 2017
1 parent 6b0cac1 commit eeff444
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 33 deletions.
226 changes: 226 additions & 0 deletions benchmark/python/sparse_end2end.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
from mxnet.test_utils import *
import time
import argparse
import os

parser = argparse.ArgumentParser(description="Run sparse linear regression " \
"with distributed kvstore",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--profiler', type=int, default=0,
help='whether to use profiler')
parser.add_argument('--num-epoch', type=int, default=1,
help='number of epochs to train')
parser.add_argument('--batch-size', type=int, default=512,
help='number of examples per batch')
parser.add_argument('--num-batch', type=int, default=99999999,
help='number of batches per epoch')
parser.add_argument('--dummy-iter', type=int, default=0,
help='whether to use dummy iterator to exclude io cost')
parser.add_argument('--kvstore', type=str, default='local',
help='what kvstore to use [local, dist_sync, etc]')
parser.add_argument('--log-level', type=str, default='debug',
help='logging level [debug, info, error]')
parser.add_argument('--dataset', type=str, default='avazu',
help='what test dataset to use')
parser.add_argument('--num-gpu', type=int, default=0,
help='number of gpus to use. 0 means using cpu(0);'
'otherwise, use gpu(0),...,gpu(num_gpu-1)')
parser.add_argument('--output-dim', type=int, default=4,
help='number of columns of the forward output')


def get_libsvm_data(data_dir, data_name, url, data_origin_name):
if not os.path.isdir(data_dir):
os.system("mkdir " + data_dir)
os.chdir(data_dir)
if (not os.path.exists(data_name)):
import urllib
zippath = os.path.join(data_dir, data_origin_name)
urllib.urlretrieve(url, zippath)
os.system("bzip2 -d %r" % data_origin_name)
os.chdir("..")


class DummyIter(mx.io.DataIter):
"A dummy iterator that always return the same batch, used for speed testing"
def __init__(self, real_iter):
super(DummyIter, self).__init__()
self.real_iter = real_iter
self.provide_data = real_iter.provide_data
self.provide_label = real_iter.provide_label
self.batch_size = real_iter.batch_size

for batch in real_iter:
self.the_batch = batch
break

def __iter__(self):
return self

def next(self):
return self.the_batch

# testing dataset sources
avazu = {
'data_name': 'avazu-app.t',
'data_origin_name': 'avazu-app.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2",
'feature_dim': 1000000,
}

kdda = {
'data_name': 'kdda.t',
'data_origin_name': 'kdda.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2",
'feature_dim': 20216830,
}

datasets = { 'kdda' : kdda, 'avazu' : avazu }


def get_sym(feature_dim):
x = mx.symbol.Variable("data", stype='csr')
norm_init = mx.initializer.Normal(sigma=0.01)
w = mx.symbol.Variable("w", shape=(feature_dim, args.output_dim), init=norm_init, stype='row_sparse')
embed = mx.symbol.dot(x, w)
y = mx.symbol.Variable("softmax_label")
model = mx.symbol.SoftmaxOutput(data=embed, label=y, name="out")
return model


def row_sparse_pull(kv, key, data, slices, weight_array, priority):
# if have kvstore, need to pull corresponding rows of
# the weights to each context
# column indices (NDArray type) of the csr data
# used as the row_idx of the weight row-sparse matrix
row_indices = data.indices
if len(slices) == 1:
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_indices)
else: # more than one slices, multi-GPU training. Need to retain weight rows according to data slices
# TODO(junwu):
# the following line blocks, may need to pre-compute
# and cache it outside the for loop
indptr = data.indptr.asnumpy()
row_idx_array = []
for s in slices:
row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]])
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array)


if __name__ == '__main__':

# arg parser
args = parser.parse_args()
num_epoch = args.num_epoch
num_batch = args.num_batch
kvstore = args.kvstore
profiler = args.profiler > 0
batch_size = args.batch_size if args.num_gpu == 0 else args.num_gpu * args.batch_size
dummy_iter = args.dummy_iter
dataset = args.dataset
log_level = args.log_level
contexts = mx.context.cpu(0) if args.num_gpu < 1\
else [mx.context.gpu(i) for i in range(args.num_gpu)]

# create kvstore when there are gpus
kv = mx.kvstore.create(kvstore) if args.num_gpu >= 1 else None
rank = kv.rank if kv is not None else 0
num_worker = kv.num_workers if kv is not None else 1

# only print log for rank 0 worker
import logging
if rank != 0:
log_level = logging.ERROR
elif log_level == 'DEBUG':
log_level = logging.DEBUG
else:
log_level = logging.INFO
head = '%(asctime)-15s %(message)s'
logging.basicConfig(level=log_level, format=head)

# dataset
assert(dataset in datasets), "unknown dataset " + dataset
metadata = datasets[dataset]
feature_dim = metadata['feature_dim']
if logging:
logging.debug('preparing data ... ')
data_dir = os.path.join(os.getcwd(), 'data')
path = os.path.join(data_dir, metadata['data_name'])
if not os.path.exists(path):
get_libsvm_data(data_dir, metadata['data_name'], metadata['url'],
metadata['data_origin_name'])
assert os.path.exists(path)

# data iterator
train_data = mx.io.LibSVMIter(data_libsvm=path, data_shape=(feature_dim,),
batch_size=batch_size, num_parts=num_worker,
part_index=rank)
if dummy_iter:
train_data = DummyIter(train_data)

# model
model = get_sym(feature_dim)

# module
mod = mx.mod.Module(symbol=model, data_names=['data'],
label_names=['softmax_label'], context=contexts)
mod.bind(data_shapes=train_data.provide_data, label_shapes=train_data.provide_label)
mod.init_params(initializer=mx.init.Uniform(scale=.1))
sgd = mx.optimizer.SGD(momentum=0.0, clip_gradient=5.0,
learning_rate=0.1, rescale_grad=1.0/batch_size/num_worker)
mod.init_optimizer(optimizer=sgd, kvstore=kv)
# use accuracy as the metric
metric = mx.metric.create('acc')

index = mod._exec_group.param_names.index('w')
# weight_array bound to executors of the contexts
weight_array = mod._exec_group.param_arrays[index]

# start profiler
if profiler:
device = 'cpu'
if args.num_gpu > 0:
device = 'gpu' + str(args.num_gpu)
name = 'profile_' + args.dataset + '_' + device + '_nworker' + str(num_worker)\
+ '_batchsize' + str(args.batch_size) + '_outdim' + str(args.output_dim) + '.json'
mx.profiler.profiler_set_config(mode='all', filename=name)
mx.profiler.profiler_set_state('run')

logging.debug('start training ...')
start = time.time()
data_iter = iter(train_data)
for epoch in range(num_epoch):
nbatch = 0
end_of_batch = False
data_iter.reset()
metric.reset()
next_batch = next(data_iter)
if kv is not None:
row_sparse_pull(kv, 'w', next_batch.data[0], mod._exec_group.slices, weight_array, -index)
while not end_of_batch:
nbatch += 1
batch = next_batch

mod.forward_backward(batch)
# update parameters
mod.update()

try:
# pre fetch next batch
next_batch = next(data_iter)
if nbatch == num_batch:
raise StopIteration
if kv is not None:
row_sparse_pull(kv, 'w', next_batch.data[0], mod._exec_group.slices, weight_array, -index)
except StopIteration:
end_of_batch = True
# accumulate prediction accuracy
mod.update_metric(metric, batch.label)
logging.info('epoch %d, %s' % (epoch, metric.get()))
if epoch == 0:
print "num_batches = ", nbatch
if profiler:
mx.profiler.profiler_set_state('stop')
end = time.time()
time_cost = end - start
logging.info('num_worker = ' + str(num_worker) + ', time cost = ' + str(time_cost))
6 changes: 6 additions & 0 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ class NDArray {
return ptr_->aux_shapes;
}

/*! returns the dtypes of all aux data */
const std::vector<int>& aux_types() const {
CHECK(storage_type() != kDefaultStorage);
return ptr_->aux_types;
}

/*!
* \brief For a sparse operation on a csr matrix for example,
* the size of the column index array
Expand Down
4 changes: 3 additions & 1 deletion src/executor/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,9 @@ void GraphExecutor::InitDataEntryMemory(std::vector<NDArray>* shared_pool) {
CHECK_LE(nword, std::numeric_limits<nnvm::dim_t>::max());
// allocate float arrays
TShape shape{static_cast<nnvm::dim_t>(nword)};
NDArray nd(shape, ctx);
// TODO(junwu): adding delay_alloc=true to create nd
// is a temporary solution.
NDArray nd(shape, ctx, true);
data_pool_[i] = nd;
// put the new allocated arrays to shared pool
if (shared_pool != nullptr) {
Expand Down
110 changes: 92 additions & 18 deletions src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ class CommCPU : public Comm {
// avoid extra copy for single device, but it may bring problems for
// abnormal usage of kvstore
if (src.size() == 1) {
if (src[0].storage_type() == buf.merged.storage_type()) {
if (src[0].storage_type() == kDefaultStorage) {
return src[0];
} else {
} else { // if sparse and only one GPU, always update weight on CPU
CopyFromTo(src[0], &buf.merged, priority);
return buf.merged;
}
Expand Down Expand Up @@ -188,39 +188,113 @@ class CommCPU : public Comm {
}
}

// TODO(haibin) support broadcast row_sparse on GPU
void BroadcastRowSparse(int key, const NDArray& src,
const std::vector<std::pair<NDArray*, NDArray>>& dst,
const bool use_copy,
const int priority) override {
using namespace mshadow;
auto size = dst.size();
for (size_t i = 0; i < size; i++) {
auto out = dst[i].first;
auto row_id = dst[i].second;
CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row-sparse src NDArray";
CHECK_EQ(src.ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with src on gpu context not supported";
for (size_t i = 0; i < dst.size(); ++i) {
NDArray* out = dst[i].first;
NDArray row_id = dst[i].second;
if (use_copy) {
CopyFromTo(src, out, priority);
} else {
CHECK_EQ(out->storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row_sparse dst NDArray";
CHECK_EQ(out->ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with dst on gpu context not supported";
CHECK_EQ(row_id.ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with src on gpu context not supported";
<< "BroadcastRowSparse with row_indices on gpu context not supported";
// retain according to unique indices
Engine::Get()->PushSync([src, out, row_id](RunContext rctx) {
NDArray *output = out;
const auto indices = row_id.data();
op::SparseRetainOpForwardRspImpl<cpu>(rctx.get_stream<cpu>(),
src, indices, kWriteTo,
output);
}, Context::CPU(), {src.var(), row_id.var()}, {out->var()},
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain"));
const bool use_sparse_retain = (src.shape()[0] != src.storage_shape()[0])
|| (row_id.dtype() != out->aux_type(rowsparse::kIdx))
|| (out->ctx().dev_mask() != Context::kGPU);
if (use_sparse_retain) { // use sparse_retain op
const bool is_to_gpu = out->ctx().dev_mask() == Context::kGPU;
NDArray out_cpu = is_to_gpu? NDArray(kRowSparseStorage, src.shape(),
src.ctx(), true, src.dtype(), src.aux_types()) : *out;
Engine::Get()->PushSync([=](RunContext rctx) {
const TBlob& indices = row_id.data();
NDArray temp = out_cpu; // get rid of const qualifier
op::SparseRetainOpForwardRspImpl<cpu>(rctx.get_stream<cpu>(),
src, indices, kWriteTo,
&temp);
}, Context::CPU(), {src.var(), row_id.var()}, {out_cpu.var()},
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain"));
if (is_to_gpu) {
CopyFromTo(out_cpu, out, priority);
}
} else { // direct copy rows
Engine::Get()->PushSync([=](RunContext rctx) {
CopyRetainedRowsToGPU(rctx.get_stream<cpu>(), rctx.get_stream<gpu>(),
src, row_id, out);
}, out->ctx(), {src.var(), row_id.var()}, {out->var()},
FnProperty::kCopyToGPU, priority, PROFILER_MESSAGE("KVStoreCopyRetainedRowsToGPU"));
}
}
}
}

private:
/*!
* \brief When src is a rsp with full rows,
* simply copy retained rows directly from cpu to gpu
* without invoking sparse_retain op.
*/
void CopyRetainedRowsToGPU(mshadow::Stream<cpu>* cpu_stream,
mshadow::Stream<gpu>* gpu_stream,
const NDArray& src,
const NDArray& indices,
NDArray* dst) {
#if MXNET_USE_CUDA == 1
CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "CopyRetainedRowsToGPU expects row-sparse src NDArray";
CHECK_EQ(src.ctx().dev_mask(), Context::kCPU)
<< "CopyRetainedRowsToGPU with src on gpu context not supported";
CHECK_EQ(src.storage_shape()[0], src.shape()[0])
<< "CopyRetainedRowsToGPU only supports src rsp with full rows";
CHECK_EQ(indices.storage_type(), kDefaultStorage);
CHECK_EQ(indices.ctx().dev_mask(), Context::kCPU);
CHECK_EQ(dst->storage_type(), kRowSparseStorage);
CHECK_EQ(dst->ctx().dev_mask(), Context::kGPU);
CHECK_EQ(indices.dtype(), dst->aux_type(rowsparse::kIdx))
<< "CopyRetainedRowsToGPU only supports same data type for idx array and dst aux_data(0)";
if (!src.storage_initialized() || indices.data().Size() == 0U) {
op::FillZerosRspImpl(gpu_stream, dst);
return;
}
using namespace mshadow;

const TBlob& src_data = src.data();
const TBlob& idx_data = indices.data();
const size_t row_length = src.shape().ProdShape(1, src.shape().ndim());
const size_t num_rows_retained = idx_data.Size();
dst->CheckAndAlloc({Shape1(num_rows_retained)});
TBlob dst_data = dst->data();
TBlob dst_idx_data = dst->aux_data(rowsparse::kIdx);
MSHADOW_TYPE_SWITCH(src.dtype(), DType, {
MSHADOW_IDX_TYPE_SWITCH(indices.dtype(), IType, {
// copy idx array
Tensor<gpu, 1, IType> dst_idx_tensor = dst_idx_data.FlatTo1D<gpu, IType>(gpu_stream);
const Tensor<cpu, 1, IType> idx_tensor = idx_data.FlatTo1D<cpu, IType>(cpu_stream);
Copy(dst_idx_tensor, idx_tensor, gpu_stream);
// copy src data
const Tensor<cpu, 2, DType> src_data_tensor = src_data.get_with_shape<cpu, 2, DType>(
Shape2(src_data.shape_[0], row_length), cpu_stream);
Tensor<gpu, 2, DType> dst_data_tensor = dst_data.get_with_shape<gpu, 2, DType>(
Shape2(dst_data.shape_[0], row_length), gpu_stream);
for (size_t i = 0; i < num_rows_retained; ++i) {
Copy(dst_data_tensor[i], src_data_tensor[idx_tensor[i]], gpu_stream);
}
})
})
#else
LOG(FATAL) << "GPU not enabled";
#endif
}

// reduce sum into val[0]
inline void ReduceSumCPU(const std::vector<NDArray> &in_data) {
MSHADOW_TYPE_SWITCH(in_data[0].dtype(), DType, {
Expand Down
Loading

0 comments on commit eeff444

Please sign in to comment.