Skip to content

Commit

Permalink
[LIHADOOP-71407] Store bytebuffer in the db instead of String (apache…
Browse files Browse the repository at this point in the history
…#111)

* [LIHADOOP-71407] Store bytebuffer in the db instead of String
  • Loading branch information
otterc committed Jun 7, 2023
1 parent 2cbe049 commit 9d3cf88
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ public void initializeApplication(ApplicationInitializationContext context) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
AppId fullId = new AppId(appId);
byte[] key = dbAppKey(fullId);
byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
ByteBuffer dbVal = metaInfo != null ?
JavaUtils.stringToBytes(shuffleSecret) : appServiceData;
byte[] value = mapper.writeValueAsString(dbVal).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
}
secretManager.registerApp(appId, shuffleSecret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID =
"org.apache.spark.shuffle.sort.SortShuffleManager:{\"mergeDir\": \"merge_manager\"}"
private val DUMMY_BLOCK_DATA = "dummyBlockData".getBytes(StandardCharsets.UTF_8)
private val DUMMY_PASSWORD = "dummyPassword"
private val EMPTY_PASSWORD = ""

private var recoveryLocalDir: File = _
protected var tempDir: File = _
Expand Down Expand Up @@ -191,7 +193,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
val app3Data = makeAppInfo("user", app3Id)
s1.initializeApplication(app3Data)
val app4Id = ApplicationId.newInstance(0, 4)
val app4Data = makeAppInfo("user", app4Id)
val app4Data = makeAppInfo("user", app4Id, metadataStorageDisabled = false,
authEnabled = true, DUMMY_PASSWORD)
s1.initializeApplication(app4Data)

val execStateFile = s1.registeredExecutorFile
Expand Down Expand Up @@ -1038,15 +1041,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {

private def makeAppInfo(user: String, appId: ApplicationId,
metadataStorageDisabled: Boolean = false,
authEnabled: Boolean = true): ApplicationInitializationContext = {
authEnabled: Boolean = true,
password: String = EMPTY_PASSWORD): ApplicationInitializationContext = {
if (!metadataStorageDisabled) {
val secret = ByteBuffer.wrap(new Array[Byte](0))
new ApplicationInitializationContext(user, appId, secret)
new ApplicationInitializationContext(user, appId, JavaUtils.stringToBytes(password))
} else {
val payload = new mutable.HashMap[String, Object]()
payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, java.lang.Boolean.TRUE)
if (authEnabled) {
payload.put(YarnShuffleService.SECRET_KEY, "")
payload.put(YarnShuffleService.SECRET_KEY, password)
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
Expand Down Expand Up @@ -1133,13 +1136,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1 = createYarnShuffleService()
val app1Id = ApplicationId.newInstance(1681252509, 1)
val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true)
val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true,
authEnabled = true, EMPTY_PASSWORD)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(1681252509, 2)
val app2Data = makeAppInfo("user", app2Id)
val app2Data = makeAppInfo("user", app2Id, metadataStorageDisabled = false,
authEnabled = true, DUMMY_PASSWORD)
s1.initializeApplication(app2Data)
assert(s1.secretManager.getSecretKey(app1Id.toString()) == "")
assert(s1.secretManager.getSecretKey(app2Id.toString()) == "")
assert(s1.secretManager.getSecretKey(app1Id.toString()) == EMPTY_PASSWORD)
assert(s1.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)

val execShuffleInfo1 =
new ExecutorShuffleInfo(
Expand Down Expand Up @@ -1191,7 +1196,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
s2 = createYarnShuffleService()
// Since secret of app1 is not saved in the db, it isn't recovered
assert(s2.secretManager.getSecretKey(app1Id.toString()) == null)
assert(s2.secretManager.getSecretKey(app2Id.toString()) == "")
assert(s2.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)

val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler)
val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
Expand Down

0 comments on commit 9d3cf88

Please sign in to comment.