Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2554][SQL] CountDistinct partial aggregation and object allocation improvements #1935

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
213ada8
First draft of partially aggregated and code generated count distinct…
marmbrus Aug 17, 2014
bd08239
WIP
marmbrus Aug 17, 2014
050bb97
Skip no-arg constructors for kryo,
marmbrus Aug 17, 2014
41fbd1d
Never try and create an empty hash set.
marmbrus Aug 17, 2014
d494598
Fix tests now that the planner is better
marmbrus Aug 18, 2014
9153652
better toString
marmbrus Aug 18, 2014
38c7449
comments and style
marmbrus Aug 18, 2014
f31b8ad
more fixes
marmbrus Aug 18, 2014
c1f7114
Improve tests / fix serialization.
marmbrus Aug 18, 2014
b3d0f64
Add golden files.
marmbrus Aug 18, 2014
57ae3b1
Fix order dependent test
marmbrus Aug 18, 2014
27984d0
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 18, 2014
abee26d
WIP
marmbrus Aug 18, 2014
87d101d
Fix isNullAt bug
marmbrus Aug 19, 2014
58d15f1
disable codegen logging
marmbrus Aug 19, 2014
8ff6402
Add specific row.
marmbrus Aug 19, 2014
2b46c4b
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 19, 2014
c9e67de
Made SpecificRow and types serializable by Kryo
GregOwen Aug 19, 2014
3868f6c
Merge pull request #9 from GregOwen/countDistinctPartial
marmbrus Aug 19, 2014
db44a30
JIT hax.
marmbrus Aug 19, 2014
93d0f64
metastore concurrency fix.
marmbrus Aug 19, 2014
fdca896
cleanup
marmbrus Aug 19, 2014
fae38f4
Fix style
marmbrus Aug 19, 2014
b2e8ef3
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 20, 2014
c122cca
Address comments, add tests
marmbrus Aug 20, 2014
32d216f
reynolds comments
marmbrus Aug 20, 2014
8074a80
fix tests
marmbrus Aug 20, 2014
5c7848d
turn off caching in the constructor
marmbrus Aug 23, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

protected val exprArray = expressions.toArray
// null check is required for when Kryo invokes the no-arg constructor.
protected val exprArray = if (expressions != null) expressions.toArray else null

def apply(input: Row): Row = {
val outputArray = new Array[Any](exprArray.length)
Expand Down Expand Up @@ -109,7 +110,346 @@ class JoinedRow extends Row {
def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) = apply(i) == null
def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
* The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
* are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
* calls in the critical path are polymorphic. By creating special versions of this class that are
* used in only a single location of the code, we increase the chance that only a single type of
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
* crazy but in benchmarks it had noticeable effects.
*/
class JoinedRow2 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow3 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow4 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow5 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object EmptyRow extends Row {
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
*/
class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
/** No-arg constructor for serialization. */
def this() = this(null)

Expand Down
Loading