From acb81db2d5f83ab7e16c8d8ba20b1c3674e385a3 Mon Sep 17 00:00:00 2001 From: jiafu zhang Date: Fri, 4 Nov 2022 17:00:14 +0800 Subject: [PATCH] [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable (#59) * [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable Signed-off-by: jiafu zhang * [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable Signed-off-by: jiafu zhang * [REMOTE-SHUFFLE-56] Make shuffle object class and object hint configurable Signed-off-by: Jifu Zhang Signed-off-by: jiafu zhang Signed-off-by: Jifu Zhang --- pom.xml | 2 +- shuffle-daos/pom.xml | 4 ++-- .../spark/shuffle/daos/DaosWriterAsync.java | 4 ++-- .../org/apache/spark/shuffle/daos/IOManager.java | 13 ++++++++++++- .../org/apache/spark/shuffle/daos/package.scala | 16 ++++++++++++++++ .../spark/shuffle/daos/DaosShuffleIOTest.java | 3 ++- shuffle-hadoop/pom.xml | 2 +- 7 files changed, 36 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 31862918..bcbdb0eb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 2.2.0-SNAPSHOT OAP Remote Shuffle Parent POM pom diff --git a/shuffle-daos/pom.xml b/shuffle-daos/pom.xml index 43120664..3d78a73d 100644 --- a/shuffle-daos/pom.xml +++ b/shuffle-daos/pom.xml @@ -7,7 +7,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 2.2.0-SNAPSHOT remote-shuffle-daos OAP Remote Shuffle Based on DAOS Object API @@ -242,7 +242,7 @@ io.daos daos-java - 1.2.1-SNAPSHOT + 2.2.1 junit diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java index ae9d4c89..2ea0110e 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java @@ -214,7 +214,7 @@ static class AsyncDescCache implements ObjectCache { private int total; private IODescUpdAsync[] array; - public AsyncDescCache(int maxNbr) { + AsyncDescCache(int maxNbr) { this.array = new IODescUpdAsync[maxNbr]; } @@ -250,7 +250,7 @@ public void put(IODescUpdAsync desc) { @Override public boolean isFull() { - return total == array.length; + return idx >= array.length; } public void release() { diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/IOManager.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/IOManager.java index 9121552d..4de373ac 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/IOManager.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/IOManager.java @@ -23,11 +23,16 @@ package org.apache.spark.shuffle.daos; +import io.daos.DaosObjClassHint; +import io.daos.DaosObjectClass; +import io.daos.DaosObjectType; import io.daos.obj.DaosObjClient; import io.daos.obj.DaosObject; import io.daos.obj.DaosObjectException; import io.daos.obj.DaosObjectId; import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -40,6 +45,8 @@ public abstract class IOManager { protected DaosObjClient objClient; + private static Logger log = LoggerFactory.getLogger(IOManager.class); + protected IOManager(SparkConf conf, Map objectMap) { this.conf = conf; this.objectMap = objectMap; @@ -58,8 +65,11 @@ protected DaosObject getObject(long appId, int shuffleId) throws DaosObjectExcep DaosObject object = objectMap.get(key); if (object == null) { DaosObjectId id = new DaosObjectId(appId, shuffleId); - id.encode(); + id.encode(objClient.getContPtr(), DaosObjectType.DAOS_OT_DKEY_UINT64, + DaosObjectClass.valueOf(conf.get(package$.MODULE$.SHUFFLE_DAOS_OBJECT_CLASS())), + DaosObjClassHint.valueOf(conf.get(package$.MODULE$.SHUFFLE_DAOS_OBJECT_HINT())), 0); object = objClient.getObject(id); + log.info("created new object, oid high: " + object.getOid().getHigh() + ", low: " + object.getOid().getLow()); objectMap.putIfAbsent(key, object); DaosObject activeObject = objectMap.get(key); if (activeObject != object) { // release just created DaosObject @@ -73,6 +83,7 @@ protected DaosObject getObject(long appId, int shuffleId) throws DaosObjectExcep object.open(); } } + log.info("oid high: " + object.getOid().getHigh() + ", low: " + object.getOid().getLow()); return object; } diff --git a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala index 4fca6293..251ca694 100644 --- a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala +++ b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala @@ -272,4 +272,20 @@ package object daos { .checkValue(v => v > 0 & v < 0.5, "spill grant percentage should be greater than 0 and no more" + " than 0.5 .") .createWithDefault(0.1) + + val SHUFFLE_DAOS_OBJECT_CLASS = + ConfigBuilder("spark.shuffle.daos.object.class") + .doc("class of DAOS object for storing shuffled data. It tells DAOS how object data is stored and replicated. " + + "Check io.daos.DaosObjectClass for all available classes.") + .version("3.1.1") + .stringConf + .createWithDefault("OC_UNKNOWN") + + val SHUFFLE_DAOS_OBJECT_HINT = + ConfigBuilder("spark.shuffle.daos.object.hint") + .doc("hint of DAOS object class. It's about data redundancy and sharding in DAOS. Check " + + "io.daos.DaosObjClassHint for all available hints.") + .version("3.1.1") + .stringConf + .createWithDefault("DAOS_OCH_SHD_MAX") } diff --git a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java index f28dfff6..fb69125c 100644 --- a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java +++ b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java @@ -75,7 +75,8 @@ public void testSingleObjectInstanceOpen() throws Exception { DaosObjectId id = PowerMockito.mock(DaosObjectId.class); PowerMockito.whenNew(DaosObjectId.class).withArguments(appId, Long.valueOf(shuffleId)).thenReturn(id); - Mockito.doNothing().when(id).encode(); + Mockito.doNothing().when(id).encode(Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.eq(0)); Mockito.when(id.isEncoded()).thenReturn(true); DaosObject daosObject = PowerMockito.mock(DaosObject.class); DaosObjClient client = PowerMockito.mock(DaosObjClient.class); diff --git a/shuffle-hadoop/pom.xml b/shuffle-hadoop/pom.xml index a402f4d3..d9794fc5 100644 --- a/shuffle-hadoop/pom.xml +++ b/shuffle-hadoop/pom.xml @@ -5,7 +5,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 2.2.0-SNAPSHOT remote-shuffle-hadoop