Skip to content

Commit

Permalink
[REMOTE-SHUFFLE-56] Make shuffle object class and object hint configu…
Browse files Browse the repository at this point in the history
…rable (#59)

* [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable

Signed-off-by: Jifu Zhang <jiafu.zhang@intel.com>

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
Signed-off-by: Jifu Zhang <jiafu.zhang@intel.com>
  • Loading branch information
jiafuzha authored Nov 4, 2022
1 parent a1fddc1 commit acb81db
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>2.2.0-SNAPSHOT</version>
<name>OAP Remote Shuffle Parent POM</name>
<packaging>pom</packaging>

Expand Down
4 changes: 2 additions & 2 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<artifactId>remote-shuffle-daos</artifactId>
<name>OAP Remote Shuffle Based on DAOS Object API</name>
Expand Down Expand Up @@ -242,7 +242,7 @@
<dependency>
<groupId>io.daos</groupId>
<artifactId>daos-java</artifactId>
<version>1.2.1-SNAPSHOT</version>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ static class AsyncDescCache implements ObjectCache<IODescUpdAsync> {
private int total;
private IODescUpdAsync[] array;

public AsyncDescCache(int maxNbr) {
AsyncDescCache(int maxNbr) {
this.array = new IODescUpdAsync[maxNbr];
}

Expand Down Expand Up @@ -250,7 +250,7 @@ public void put(IODescUpdAsync desc) {

@Override
public boolean isFull() {
return total == array.length;
return idx >= array.length;
}

public void release() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@

package org.apache.spark.shuffle.daos;

import io.daos.DaosObjClassHint;
import io.daos.DaosObjectClass;
import io.daos.DaosObjectType;
import io.daos.obj.DaosObjClient;
import io.daos.obj.DaosObject;
import io.daos.obj.DaosObjectException;
import io.daos.obj.DaosObjectId;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -40,6 +45,8 @@ public abstract class IOManager {

protected DaosObjClient objClient;

private static Logger log = LoggerFactory.getLogger(IOManager.class);

protected IOManager(SparkConf conf, Map<String, DaosObject> objectMap) {
this.conf = conf;
this.objectMap = objectMap;
Expand All @@ -58,8 +65,11 @@ protected DaosObject getObject(long appId, int shuffleId) throws DaosObjectExcep
DaosObject object = objectMap.get(key);
if (object == null) {
DaosObjectId id = new DaosObjectId(appId, shuffleId);
id.encode();
id.encode(objClient.getContPtr(), DaosObjectType.DAOS_OT_DKEY_UINT64,
DaosObjectClass.valueOf(conf.get(package$.MODULE$.SHUFFLE_DAOS_OBJECT_CLASS())),
DaosObjClassHint.valueOf(conf.get(package$.MODULE$.SHUFFLE_DAOS_OBJECT_HINT())), 0);
object = objClient.getObject(id);
log.info("created new object, oid high: " + object.getOid().getHigh() + ", low: " + object.getOid().getLow());
objectMap.putIfAbsent(key, object);
DaosObject activeObject = objectMap.get(key);
if (activeObject != object) { // release just created DaosObject
Expand All @@ -73,6 +83,7 @@ protected DaosObject getObject(long appId, int shuffleId) throws DaosObjectExcep
object.open();
}
}
log.info("oid high: " + object.getOid().getHigh() + ", low: " + object.getOid().getLow());
return object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,20 @@ package object daos {
.checkValue(v => v > 0 & v < 0.5, "spill grant percentage should be greater than 0 and no more" +
" than 0.5 .")
.createWithDefault(0.1)

val SHUFFLE_DAOS_OBJECT_CLASS =
ConfigBuilder("spark.shuffle.daos.object.class")
.doc("class of DAOS object for storing shuffled data. It tells DAOS how object data is stored and replicated. " +
"Check io.daos.DaosObjectClass for all available classes.")
.version("3.1.1")
.stringConf
.createWithDefault("OC_UNKNOWN")

val SHUFFLE_DAOS_OBJECT_HINT =
ConfigBuilder("spark.shuffle.daos.object.hint")
.doc("hint of DAOS object class. It's about data redundancy and sharding in DAOS. Check " +
"io.daos.DaosObjClassHint for all available hints.")
.version("3.1.1")
.stringConf
.createWithDefault("DAOS_OCH_SHD_MAX")
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testSingleObjectInstanceOpen() throws Exception {

DaosObjectId id = PowerMockito.mock(DaosObjectId.class);
PowerMockito.whenNew(DaosObjectId.class).withArguments(appId, Long.valueOf(shuffleId)).thenReturn(id);
Mockito.doNothing().when(id).encode();
Mockito.doNothing().when(id).encode(Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.eq(0));
Mockito.when(id.isEncoded()).thenReturn(true);
DaosObject daosObject = PowerMockito.mock(DaosObject.class);
DaosObjClient client = PowerMockito.mock(DaosObjClient.class);
Expand Down
2 changes: 1 addition & 1 deletion shuffle-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>remote-shuffle-hadoop</artifactId>
Expand Down

0 comments on commit acb81db

Please sign in to comment.