Skip to content

Commit

Permalink
Add IP Port kvs_attr and Kmeans, PCA test cases validated
Browse files Browse the repository at this point in the history
  • Loading branch information
xwu99 committed Jan 26, 2021
1 parent ac216f4 commit 99b9551
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 82 deletions.
32 changes: 32 additions & 0 deletions mllib-dal/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash

# Check envs for building
if [[ -z $JAVA_HOME ]]; then
echo $JAVA_HOME not defined!
exit 1
fi

if [[ -z $DAALROOT ]]; then
echo DAALROOT not defined!
exit 1
fi

if [[ -z $TBBROOT ]]; then
echo TBBROOT not defined!
exit 1
fi

if [[ -z $CCL_ROOT ]]; then
echo CCL_ROOT not defined!
exit 1
fi

echo === Building Environments ===
echo JAVA_HOME=$JAVA_HOME
echo DAALROOT=$DAALROOT
echo TBBROOT=$TBBROOT
echo CCL_ROOT=$CCL_ROOT
echo GCC Version: $(gcc -dumpversion)
echo =============================

mvn -DskipTests clean package
16 changes: 11 additions & 5 deletions mllib-dal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,12 @@
<resource>
<directory>${env.CCL_ROOT}/lib</directory>
<includes>
<include>libpmi.so.1</include>
<include>libresizable_pmi.so.1</include>
<!--<include>libpmi.so.1</include>-->
<!--<include>libresizable_pmi.so.1</include>-->
<include>libmpi.so.12.0.0</include>
<include>libfabric.so.1</include>
<include>libccl_atl_ofi.so.1</include>
<include>libccl.so</include>
<!--<include>libccl_atl_ofi.so.1</include>-->
</includes>
</resource>
<resource>
Expand Down Expand Up @@ -271,9 +273,13 @@
<destinationFile>${project.build.testOutputDirectory}/lib/libtbbmalloc.so.2</destinationFile>
</fileSet>
<fileSet>
<sourceFile>${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so.1</sourceFile>
<destinationFile>${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so</destinationFile>
<sourceFile>${project.build.testOutputDirectory}/lib/libmpi.so.12.0.0</sourceFile>
<destinationFile>${project.build.testOutputDirectory}/lib/libmpi.so.12</destinationFile>
</fileSet>
<!--<fileSet>-->
<!--<sourceFile>${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so.1</sourceFile>-->
<!--<destinationFile>${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so</destinationFile>-->
<!--</fileSet>-->
</fileSets>
</configuration>
</execution>
Expand Down
13 changes: 4 additions & 9 deletions mllib-dal/src/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,21 @@
</file>
<!-- Include oneCCL libraries into JAR -->
<file>
<source>${env.CCL_ROOT}/lib/libpmi.so.1</source>
<source>${env.CCL_ROOT}/lib/libfabric.so.1</source>
<outputDirectory>lib</outputDirectory>
</file>
<file>
<source>${env.CCL_ROOT}/lib/libresizable_pmi.so.1</source>
<source>${env.CCL_ROOT}/lib/libmpi.so.12.0.0</source>
<outputDirectory>lib</outputDirectory>
<destName>libmpi.so.12</destName>
</file>
<file>
<source>${env.CCL_ROOT}/lib//libfabric.so.1</source>
<source>${env.CCL_ROOT}/lib/libccl.so</source>
<outputDirectory>lib</outputDirectory>
</file>
<file>
<source>${env.CCL_ROOT}/lib/prov/libsockets-fi.so</source>
<outputDirectory>lib</outputDirectory>
</file>
<file>
<!-- Should rename to XXX.so for ATL to load -->
<source>${env.CCL_ROOT}/lib/libccl_atl_ofi.so.1</source>
<outputDirectory>lib</outputDirectory>
<destName>libccl_atl_ofi.so</destName>
</file>
</files>
</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ public static synchronized void loadLibraries() throws IOException {
* Load oneCCL libs in dependency order
*/
public static synchronized void loadLibCCL() throws IOException {
loadFromJar(subDir, "libpmi.so.1");
loadFromJar(subDir, "libresizable_pmi.so.1");
loadFromJar(subDir, "libfabric.so.1");
loadFromJar(subDir, "libmpi.so.12");
loadFromJar(subDir, "libccl.so");
loadFromJar(subDir, "libsockets-fi.so");
loadFromJar(subDir, "libccl_atl_ofi.so");
}

/**
Expand Down
8 changes: 4 additions & 4 deletions mllib-dal/src/main/native/KMeansDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean
jint executor_num, jint executor_cores,
jobject resultObj) {

ccl::communicator *comm = getComm();
size_t rankId = comm->rank();
ccl::communicator &comm = getComm();
size_t rankId = comm.rank();

NumericTablePtr pData = *((NumericTablePtr *)pNumTabData);
NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters);
Expand All @@ -184,14 +184,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean
for (it = 0; it < iteration_num && !converged; it++) {
auto t1 = std::chrono::high_resolution_clock::now();

newCentroids = kmeans_compute(rankId, *comm, pData, centroids, cluster_num, executor_num, totalCost);
newCentroids = kmeans_compute(rankId, comm, pData, centroids, cluster_num, executor_num, totalCost);

if (rankId == ccl_root) {
converged = areAllCentersConverged(centroids, newCentroids, tolerance);
}

// Sync converged status
ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, *comm).wait();
ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, comm).wait();

centroids = newCentroids;

Expand Down
2 changes: 1 addition & 1 deletion mllib-dal/src/main/native/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ INCS := -I $(JAVA_HOME)/include \

# Use static link if possible, TBB is only available as dynamic libs

LIBS := -L${CCL_ROOT}/lib -l:libccl.a \
LIBS := -L${CCL_ROOT}/lib -lccl \
-L$(DAALROOT)/lib/intel64 -l:libdaal_core.a -l:libdaal_thread.a \
-L$(TBBROOT)/lib -ltbb -ltbbmalloc
# TODO: Add signal chaining support, should fix linking, package so and loading
Expand Down
34 changes: 21 additions & 13 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,40 @@
size_t comm_size;
size_t rank_id;

ccl::communicator *getComm() {
ccl::shared_ptr_class<ccl::kvs> kvs;
static ccl::communicator b = ccl::create_communicator(comm_size, rank_id, kvs);
return &b;
std::vector<ccl::communicator> g_comms;

ccl::communicator &getComm() {
return g_comms[0];
}

JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init
(JNIEnv *env, jobject obj, jobject param) {
(JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) {

std::cout << "oneCCL (native): init" << std::endl;

ccl::init();

const char *str = env->GetStringUTFChars(ip_port, 0);
ccl::string ccl_ip_port(str);

auto kvs_attr = ccl::create_kvs_attr();
kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> kvs;
ccl::kvs::address_type main_addr;
kvs = ccl::create_kvs(main_addr);

auto comm = getComm();
kvs = ccl::create_main_kvs(kvs_attr);

rank_id = comm->rank();
comm_size = comm->size();
g_comms.push_back(ccl::create_communicator(size, rank, kvs));

rank_id = getComm().rank();
comm_size = getComm().size();

jclass cls = env->GetObjectClass(param);
jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J");
jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J");

env->SetLongField(param, fid_comm_size, comm_size);
env->SetLongField(param, fid_rank_id, rank_id);
env->ReleaseStringUTFChars(ip_port, str);

return 1;
}
Expand All @@ -46,6 +52,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init
JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup
(JNIEnv *env, jobject obj) {

g_comms.pop_back();

std::cout << "oneCCL (native): cleanup" << std::endl;

}
Expand All @@ -58,7 +66,7 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup
JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot
(JNIEnv *env, jobject obj) {

return getComm()->rank() == 0;
return getComm().rank() == 0;
}

/*
Expand All @@ -68,7 +76,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot
*/
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_rankID
(JNIEnv *env, jobject obj) {
return getComm()->rank();
return getComm().rank();
}

/*
Expand Down
2 changes: 1 addition & 1 deletion mllib-dal/src/main/native/OneCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

#include <oneapi/ccl.hpp>

ccl::communicator *getComm();
ccl::communicator &getComm();
6 changes: 3 additions & 3 deletions mllib-dal/src/main/native/PCADALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL
JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, jint executor_cores,
jobject resultObj) {

ccl::communicator *comm = getComm();
size_t rankId = comm->rank();
ccl::communicator &comm = getComm();
size_t rankId = comm.rank();

const size_t nBlocks = executor_num;
const int comm_size = executor_num;
Expand Down Expand Up @@ -71,7 +71,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL
// MPI_Gather(nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(),
// perNodeArchLength, MPI_CHAR, ccl_root, MPI_COMM_WORLD);
ccl::allgatherv(nodeResults, perNodeArchLength, serializedData.get(), recv_counts,
ccl::datatype::uint8, *comm).wait();
ccl::datatype::uint8, comm).wait();

auto t2 = std::chrono::high_resolution_clock::now();

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ class KMeansDALImpl (

}.cache()

val results = coalescedTables.mapPartitions { table =>
val results = coalescedTables.mapPartitionsWithIndex { (rank, table) =>
val tableArr = table.next()
OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT)
OneCCL.init(executorNum, rank, executorIPAddress)

val initCentroids = OneDAL.makeNumericTable(centers)
val result = new KMeansResult()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class PCADALImpl (

val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext)

val results = coalescedTables.mapPartitions { table =>
val results = coalescedTables.mapPartitionsWithIndex { (rank, table) =>
val tableArr = table.next()
OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT)
OneCCL.init(executorNum, rank, executorIPAddress)

val result = new PCAResult()
cPCATrainDAL(
Expand Down
78 changes: 41 additions & 37 deletions mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,60 +23,64 @@ object OneCCL {

var cclParam = new CCLParam()

var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "")
var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt

val KVS_PORT = 51234

private def checkEnv() {
val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "")
val pmType = sys.env.getOrElse("CCL_PM_TYPE", "")
val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "")

assert(altTransport == "ofi")
assert(pmType == "resizable")
assert(ipExchange == "env")
assert(kvsIPPort != "")

}
// var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "")
// var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt

var kvsPort = 5000

// private def checkEnv() {
// val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "")
// val pmType = sys.env.getOrElse("CCL_PM_TYPE", "")
// val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "")
//
// assert(altTransport == "ofi")
// assert(pmType == "resizable")
// assert(ipExchange == "env")
// assert(kvsIPPort != "")
//
// }

// Run on Executor
def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = {
// Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT.
val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}")

println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port")

setEnv("CCL_PM_TYPE", "resizable")
setEnv("CCL_ATL_TRANSPORT","ofi")
setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir())
setEnv("CCL_KVS_IP_EXCHANGE","env")
setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port)
setEnv("CCL_WORLD_SIZE", s"${executor_num}")
// Uncomment this if you whant to debug oneCCL
// setEnv("CCL_LOG_LEVEL", "2")
}

def init(executor_num: Int, ip: String, port: Int) = {

setExecutorEnv(executor_num, ip, port)
// def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = {
// // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT.
// val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}")
//
// println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port")
//
// setEnv("CCL_PM_TYPE", "resizable")
// setEnv("CCL_ATL_TRANSPORT","ofi")
// setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir())
// setEnv("CCL_KVS_IP_EXCHANGE","env")
// setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port)
// setEnv("CCL_WORLD_SIZE", s"${executor_num}")
// // Uncomment this if you whant to debug oneCCL
// // setEnv("CCL_LOG_LEVEL", "2")
// }

def init(executor_num: Int, rank: Int, ip: String) = {

// setExecutorEnv(executor_num, ip, port)
println(s"oneCCL: Initializing with IP_PORT: ${ip}_${kvsPort}")

// cclParam is output from native code
c_init(cclParam)
c_init(executor_num, rank, ip+"_"+kvsPort.toString, cclParam)

// executor number should equal to oneCCL world size
assert(executor_num == cclParam.commSize, "executor number should equal to oneCCL world size")

println(s"oneCCL: Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}")

// Use a new port when calling init again
kvsPort = kvsPort + 1

}

// Run on Executor
def cleanup(): Unit = {
c_cleanup()
}

@native private def c_init(param: CCLParam) : Int
@native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int
@native private def c_cleanup() : Unit

@native def isRoot() : Boolean
Expand Down

0 comments on commit 99b9551

Please sign in to comment.