Skip to content

Commit

Permalink
[SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException
Browse files Browse the repository at this point in the history
- Rewind ByteBuffer before making ByteString

(This fixes a bug introduced in apache#3849 / SPARK-4014)

Author: Jongyoul Lee <jongyoul@gmail.com>

Closes apache#4119 from jongyoul/SPARK-5333 and squashes the following commits:

c6693a8 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - changed logDebug location
4141f58 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Added license information
2190606 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Adjusted imported libraries
b7f5517 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString
  • Loading branch information
jongyoul authored and JoshRosen committed Jan 20, 2015
1 parent 4afad9c commit 9d9294a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ import java.nio.ByteBuffer

import org.apache.mesos.protobuf.ByteString

import org.apache.spark.Logging

/**
* Wrapper for serializing the data sent when launching Mesos tasks.
*/
private[spark] case class MesosTaskLaunchData(
serializedTask: ByteBuffer,
attemptNumber: Int) {
attemptNumber: Int) extends Logging {

def toByteString: ByteString = {
val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
dataBuffer.putInt(attemptNumber)
dataBuffer.put(serializedTask)
dataBuffer.rewind
logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
ByteString.copyFrom(dataBuffer)
}
}

private[spark] object MesosTaskLaunchData {
private[spark] object MesosTaskLaunchData extends Logging {
def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
val byteBuffer = byteString.asReadOnlyByteBuffer()
logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
val serializedTask = byteBuffer.slice() // subsequence starting at the current position
MesosTaskLaunchData(serializedTask, attemptNumber)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.mesos

import java.nio.ByteBuffer

import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData

class MesosTaskLaunchDataSuite extends FunSuite {
test("serialize and deserialize data must be same") {
val serializedTask = ByteBuffer.allocate(40)
(Range(100, 110).map(serializedTask.putInt(_)))
serializedTask.rewind
val attemptNumber = 100
val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
serializedTask.rewind
val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
}
}

0 comments on commit 9d9294a

Please sign in to comment.