Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2554][SQL] CountDistinct partial aggregation and object allocation improvements #1935

Closed
wants to merge 28 commits into from

Conversation

marmbrus
Copy link
Contributor

No description provided.

@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA tests have started for PR 1935. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18514/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA results for PR 1935:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class MergeableAggregate extends PartialAggregate {
case class ReturnAggregate(child: AggregateExpression)
case class ReturnAggregateFunction(agg: AggregateExpression, base: AggregateExpression)
case class MergeAggregates(child: Expression)
case class MergeAggregateFunctions(expr: Expression, base: AggregateExpression)
abstract class MergableAggregateFunction extends AggregateFunction {
case class CountDistinct(expressions: Seq[Expression]) extends MergeableAggregate {
case class CountDistinctFunction(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18514/consoleFull


def this() = this(null, null) // Required for serialization.

var currentValue: MergableAggregateFunction = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put a default value for currentValue? And then we can ignore the null checking in function eval and update

@chenghao-intel
Copy link
Contributor

Don't forget the SumDistinct, :-)
One concern is about the memory usage after the data shuffled.
e.g. select sum(distinct(value)) from src

Probably we can improve that in another PRs.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1935 at commit 9153652.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have finished for PR 1935 at commit 9153652.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1935 at commit 38c7449.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have finished for PR 1935 at commit 38c7449.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1935 at commit f31b8ad.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have finished for PR 1935 at commit f31b8ad.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression


override def children = left :: right :: Nil

override def references = (left.flatMap(_.references) ++ right.flatMap(_.references)).toSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be left.references ++ right.references or children.flatMap(_.references).toSet ?

@marmbrus marmbrus force-pushed the countDistinctPartial branch from ae8cb53 to b2e8ef3 Compare August 20, 2014 21:20
@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1935 at commit b2e8ef3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1935 at commit c122cca.

  • This patch merges cleanly.


def this() = this(null, null) // Required for serialization.

val seen = new OpenHashSet[Any]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this support null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, we will never put null into it though (we always put rows in, and furthermore count distinct semantics don't count null).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add the line there explaining we never put null into it. i think the open hash set doesn't support null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think most HashSets don't support null. scala.collection.mutable.HashSet throws an exception if you try to add null.

@rxin
Copy link
Contributor

rxin commented Aug 20, 2014

Nice job. LGTM other than some comments. You probably want to remove WIP from the title.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1935 at commit b2e8ef3.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • final class Mutable$tpe extends MutableValue
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1935 at commit c122cca.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1935 at commit 8074a80.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 21, 2014

QA tests have finished for PR 1935 at commit 8074a80.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have started for PR 1935 at commit 5c7848d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have finished for PR 1935 at commit 5c7848d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@marmbrus marmbrus changed the title [WIP][SPARK-2554][SQL] CountDistinct and SumDistinct should do partial aggregation [SPARK-2554][SQL] CountDistinct and SumDistinct should do partial aggregation Aug 23, 2014
@marmbrus marmbrus changed the title [SPARK-2554][SQL] CountDistinct and SumDistinct should do partial aggregation [SPARK-2554][SQL] CountDistinct should do partial aggregation Aug 23, 2014
@marmbrus marmbrus changed the title [SPARK-2554][SQL] CountDistinct should do partial aggregation [SPARK-2554][SQL] CountDistinct partial aggregation and object allocation improvements Aug 23, 2014
@marmbrus
Copy link
Contributor Author

Thanks for looking this over! I've merged to master and 1.1

@asfgit asfgit closed this in 7e191fe Aug 23, 2014
asfgit pushed a commit that referenced this pull request Aug 23, 2014
…tion improvements

Author: Michael Armbrust <michael@databricks.com>
Author: Gregory Owen <greowen@gmail.com>

Closes #1935 from marmbrus/countDistinctPartial and squashes the following commits:

5c7848d [Michael Armbrust] turn off caching in the constructor
8074a80 [Michael Armbrust] fix tests
32d216f [Michael Armbrust] reynolds comments
c122cca [Michael Armbrust] Address comments, add tests
b2e8ef3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
fae38f4 [Michael Armbrust] Fix style
fdca896 [Michael Armbrust] cleanup
93d0f64 [Michael Armbrust] metastore concurrency fix.
db44a30 [Michael Armbrust] JIT hax.
3868f6c [Michael Armbrust] Merge pull request #9 from GregOwen/countDistinctPartial
c9e67de [Gregory Owen] Made SpecificRow and types serializable by Kryo
2b46c4b [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
8ff6402 [Michael Armbrust] Add specific row.
58d15f1 [Michael Armbrust] disable codegen logging
87d101d [Michael Armbrust] Fix isNullAt bug
abee26d [Michael Armbrust] WIP
27984d0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
57ae3b1 [Michael Armbrust] Fix order dependent test
b3d0f64 [Michael Armbrust] Add golden files.
c1f7114 [Michael Armbrust] Improve tests / fix serialization.
f31b8ad [Michael Armbrust] more fixes
38c7449 [Michael Armbrust] comments and style
9153652 [Michael Armbrust] better toString
d494598 [Michael Armbrust] Fix tests now that the planner is better
41fbd1d [Michael Armbrust] Never try and create an empty hash set.
050bb97 [Michael Armbrust] Skip no-arg constructors for kryo,
bd08239 [Michael Armbrust] WIP
213ada8 [Michael Armbrust] First draft of partially aggregated and code generated count distinct / max

(cherry picked from commit 7e191fe)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@marmbrus marmbrus deleted the countDistinctPartial branch August 27, 2014 20:44
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…tion improvements

Author: Michael Armbrust <michael@databricks.com>
Author: Gregory Owen <greowen@gmail.com>

Closes apache#1935 from marmbrus/countDistinctPartial and squashes the following commits:

5c7848d [Michael Armbrust] turn off caching in the constructor
8074a80 [Michael Armbrust] fix tests
32d216f [Michael Armbrust] reynolds comments
c122cca [Michael Armbrust] Address comments, add tests
b2e8ef3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
fae38f4 [Michael Armbrust] Fix style
fdca896 [Michael Armbrust] cleanup
93d0f64 [Michael Armbrust] metastore concurrency fix.
db44a30 [Michael Armbrust] JIT hax.
3868f6c [Michael Armbrust] Merge pull request apache#9 from GregOwen/countDistinctPartial
c9e67de [Gregory Owen] Made SpecificRow and types serializable by Kryo
2b46c4b [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
8ff6402 [Michael Armbrust] Add specific row.
58d15f1 [Michael Armbrust] disable codegen logging
87d101d [Michael Armbrust] Fix isNullAt bug
abee26d [Michael Armbrust] WIP
27984d0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
57ae3b1 [Michael Armbrust] Fix order dependent test
b3d0f64 [Michael Armbrust] Add golden files.
c1f7114 [Michael Armbrust] Improve tests / fix serialization.
f31b8ad [Michael Armbrust] more fixes
38c7449 [Michael Armbrust] comments and style
9153652 [Michael Armbrust] better toString
d494598 [Michael Armbrust] Fix tests now that the planner is better
41fbd1d [Michael Armbrust] Never try and create an empty hash set.
050bb97 [Michael Armbrust] Skip no-arg constructors for kryo,
bd08239 [Michael Armbrust] WIP
213ada8 [Michael Armbrust] First draft of partially aggregated and code generated count distinct / max
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants