Skip to content

Commit

Permalink
[SPARK-2554][SQL] CountDistinct partial aggregation and object alloca…
Browse files Browse the repository at this point in the history
…tion improvements

Author: Michael Armbrust <michael@databricks.com>
Author: Gregory Owen <greowen@gmail.com>

Closes apache#1935 from marmbrus/countDistinctPartial and squashes the following commits:

5c7848d [Michael Armbrust] turn off caching in the constructor
8074a80 [Michael Armbrust] fix tests
32d216f [Michael Armbrust] reynolds comments
c122cca [Michael Armbrust] Address comments, add tests
b2e8ef3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
fae38f4 [Michael Armbrust] Fix style
fdca896 [Michael Armbrust] cleanup
93d0f64 [Michael Armbrust] metastore concurrency fix.
db44a30 [Michael Armbrust] JIT hax.
3868f6c [Michael Armbrust] Merge pull request #9 from GregOwen/countDistinctPartial
c9e67de [Gregory Owen] Made SpecificRow and types serializable by Kryo
2b46c4b [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
8ff6402 [Michael Armbrust] Add specific row.
58d15f1 [Michael Armbrust] disable codegen logging
87d101d [Michael Armbrust] Fix isNullAt bug
abee26d [Michael Armbrust] WIP
27984d0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
57ae3b1 [Michael Armbrust] Fix order dependent test
b3d0f64 [Michael Armbrust] Add golden files.
c1f7114 [Michael Armbrust] Improve tests / fix serialization.
f31b8ad [Michael Armbrust] more fixes
38c7449 [Michael Armbrust] comments and style
9153652 [Michael Armbrust] better toString
d494598 [Michael Armbrust] Fix tests now that the planner is better
41fbd1d [Michael Armbrust] Never try and create an empty hash set.
050bb97 [Michael Armbrust] Skip no-arg constructors for kryo,
bd08239 [Michael Armbrust] WIP
213ada8 [Michael Armbrust] First draft of partially aggregated and code generated count distinct / max

(cherry picked from commit 7e191fe)
Signed-off-by: Michael Armbrust <michael@databricks.com>
  • Loading branch information
marmbrus committed Aug 23, 2014
1 parent 9309786 commit 7112da8
Show file tree
Hide file tree
Showing 33 changed files with 1,239 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

protected val exprArray = expressions.toArray
// null check is required for when Kryo invokes the no-arg constructor.
protected val exprArray = if (expressions != null) expressions.toArray else null

def apply(input: Row): Row = {
val outputArray = new Array[Any](exprArray.length)
Expand Down Expand Up @@ -109,7 +110,346 @@ class JoinedRow extends Row {
def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) = apply(i) == null
def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
* The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
* are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
* calls in the critical path are polymorphic. By creating special versions of this class that are
* used in only a single location of the code, we increase the chance that only a single type of
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
* crazy but in benchmarks it had noticeable effects.
*/
class JoinedRow2 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow3 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow4 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

def getLong(i: Int): Long =
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)

def getDouble(i: Int): Double =
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)

def getBoolean(i: Int): Boolean =
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)

def getShort(i: Int): Short =
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)

def getByte(i: Int): Byte =
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)

def getFloat(i: Int): Float =
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)

def getString(i: Int): String =
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)

def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
}

override def toString() = {
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
s"[${row.mkString(",")}]"
}
}

/**
* JIT HACK: Replace with macros
*/
class JoinedRow5 extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: Row): Row = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: Row): Row = {
row2 = newRight
this
}

def iterator = row1.iterator ++ row2.iterator

def length = row1.length + row2.length

def apply(i: Int) =
if (i < row1.size) row1(i) else row2(i - row1.size)

def isNullAt(i: Int) =
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)

def getInt(i: Int): Int =
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object EmptyRow extends Row {
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
*/
class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
/** No-arg constructor for serialization. */
def this() = this(null)

Expand Down
Loading

0 comments on commit 7112da8

Please sign in to comment.