From bce0ba1fbd05788f1c08549b2fd0c6a9e320a41a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 29 Jan 2015 15:28:22 -0800 Subject: [PATCH 01/20] [SPARK-5429][SQL] Use javaXML plan serialization for Hive golden answers on Hive 0.13.1 I found that running `HiveComparisonTest.createQueryTest` to generate Hive golden answer files on Hive 0.13.1 would throw KryoException. I am not sure if this can be reproduced by others. Since Hive 0.13.0, Kryo plan serialization is introduced to replace javaXML as default plan serialization format. This is a quick fix to set hive configuration to use javaXML serialization. Author: Liang-Chi Hsieh Closes #4223 from viirya/fix_hivetest and squashes the following commits: 97a8760 [Liang-Chi Hsieh] Use javaXML plan serialization. --- .../src/main/scala/org/apache/spark/sql/hive/TestHive.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 822864f8ef845..7c1d1133c3425 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -68,6 +68,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { System.clearProperty("spark.hostPort") CommandProcessorFactory.clean(hiveconf) + hiveconf.set("hive.plan.serialization.format", "javaXML") + lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath From 940f3756116647a25fddb54111112b95ba9b8740 Mon Sep 17 00:00:00 2001 From: Michael Davies Date: Thu, 29 Jan 2015 15:40:59 -0800 Subject: [PATCH 02/20] [SPARK-5309][SQL] Add support for dictionaries in PrimitiveConverter for Strin... ...gs. Parquet Converters allow developers to take advantage of dictionary encoding of column data to reduce Column Binary decoding. The Spark PrimitiveConverter was not using that API and consequently for String columns that used dictionary compression repeated Binary to String conversions for the same String. In measurements this could account for over 25% of entire query time. For example a 500M row table split across 16 blocks was aggregated and summed in a litte under 30s before this change and a little under 20s after the change. Author: Michael Davies Closes #4187 from MickDavies/SPARK-5309-2 and squashes the following commits: 327287e [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings. 33c002c [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings. --- .../spark/sql/parquet/ParquetConverter.scala | 48 ++++++++++++++----- .../spark/sql/parquet/ParquetQuerySuite.scala | 11 +++++ 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 9d9150246c8d4..10df8c3310092 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} +import parquet.column.Dictionary import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} import parquet.schema.MessageType @@ -102,12 +103,8 @@ private[sql] object CatalystConverter { } // Strings, Shorts and Bytes do not have a corresponding type in Parquet // so we need to treat them separately - case StringType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.updateString(fieldIndex, value) - } - } + case StringType => + new CatalystPrimitiveStringConverter(parent, fieldIndex) case ShortType => { new CatalystPrimitiveConverter(parent, fieldIndex) { override def addInt(value: Int): Unit = @@ -197,8 +194,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = updateField(fieldIndex, value.getBytes) - protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = - updateField(fieldIndex, value.toStringUsingUTF8) + protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = + updateField(fieldIndex, value) protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { updateField(fieldIndex, readDecimal(new Decimal(), value, ctype)) @@ -384,8 +381,8 @@ private[parquet] class CatalystPrimitiveRowConverter( override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = current.update(fieldIndex, value.getBytes) - override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = - current.setString(fieldIndex, value.toStringUsingUTF8) + override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = + current.setString(fieldIndex, value) override protected[parquet] def updateDecimal( fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { @@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. + * Supports dictionaries to reduce Binary to String conversion overhead. + * + * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) + extends CatalystPrimitiveConverter(parent, fieldIndex) { + + private[this] var dict: Array[String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary):Unit = + dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8} + + + override def addValueFromDictionary(dictionaryId: Int): Unit = + parent.updateString(fieldIndex, dict(dictionaryId)) + + override def addBinary(value: Binary): Unit = + parent.updateString(fieldIndex, value.toStringUsingUTF8) +} + private[parquet] object CatalystArrayConverter { val INITIAL_ARRAY_SIZE = 20 } @@ -583,9 +607,9 @@ private[parquet] class CatalystNativeArrayConverter( elements += 1 } - override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = { + override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = { checkGrowBuffer() - buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType] + buffer(elements) = value.asInstanceOf[NativeType] elements += 1 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 1263ff818ea19..3d82f4bce7778 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -85,4 +85,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_))) } } + + test("SPARK-5309 strings stored using dictionary compression in parquet") { + withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") { + + checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"), + (0 until 10).map(i => Row("same", "run_" + i, 100))) + + checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"), + List(Row("same", "run_5", 100))) + } + } } From de221ea03288fb9fb7c14530425f4a9414b1088f Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Thu, 29 Jan 2015 15:42:23 -0800 Subject: [PATCH 03/20] [SPARK-4786][SQL]: Parquet filter pushdown for castable types Enable parquet filter pushdown of castable types like short, byte that can be cast to integer Author: Yash Datta Closes #4156 from saucam/filter_short and squashes the following commits: a403979 [Yash Datta] SPARK-4786: Fix styling issues d029866 [Yash Datta] SPARK-4786: Add test case cb2e0d9 [Yash Datta] SPARK-4786: Parquet filter pushdown for castable types --- .../spark/sql/parquet/ParquetFilters.scala | 26 +++++++++++++++++- .../sql/parquet/ParquetFilterSuite.scala | 27 ++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index f08350878f239..0357dcc4688be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -164,33 +164,57 @@ private[sql] object ParquetFilters { case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) => makeEq.lift(dataType).map(_(name, value)) + case EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => + makeEq.lift(dataType).map(_(name, value)) case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) => makeEq.lift(dataType).map(_(name, value)) - + case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => + makeEq.lift(dataType).map(_(name, value)) + case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) => makeNotEq.lift(dataType).map(_(name, value)) + case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) => + makeNotEq.lift(dataType).map(_(name, value)) case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _))) => makeNotEq.lift(dataType).map(_(name, value)) + case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType))) => + makeNotEq.lift(dataType).map(_(name, value)) case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => makeLt.lift(dataType).map(_(name, value)) + case LessThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => + makeLt.lift(dataType).map(_(name, value)) case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => makeGt.lift(dataType).map(_(name, value)) + case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => + makeGt.lift(dataType).map(_(name, value)) case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => makeLtEq.lift(dataType).map(_(name, value)) + case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => + makeLtEq.lift(dataType).map(_(name, value)) case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => makeGtEq.lift(dataType).map(_(name, value)) + case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => + makeGtEq.lift(dataType).map(_(name, value)) case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => makeGt.lift(dataType).map(_(name, value)) + case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => + makeGt.lift(dataType).map(_(name, value)) case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => makeLt.lift(dataType).map(_(name, value)) + case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => + makeLt.lift(dataType).map(_(name, value)) case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => makeGtEq.lift(dataType).map(_(name, value)) + case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => + makeGtEq.lift(dataType).map(_(name, value)) case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => makeLtEq.lift(dataType).map(_(name, value)) + case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => + makeLtEq.lift(dataType).map(_(name, value)) case And(lhs, rhs) => (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index c9bc55900de98..e78145f4dda5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -21,7 +21,8 @@ import parquet.filter2.predicate.Operators._ import parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, Predicate, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row} +import org.apache.spark.sql.types._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf} @@ -93,6 +94,30 @@ class ParquetFilterSuite extends QueryTest with ParquetTest { } } + test("filter pushdown - short") { + withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit rdd => + checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq [_]], 1) + checkFilterPredicate(Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) + + checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt [_]], 1) + checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt [_]], 4) + checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1) + checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4) + + checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq [_]], 1) + checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt [_]], 1) + checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt [_]], 4) + checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1) + checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4) + + checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4) + checkFilterPredicate(Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, + classOf[Operators.And], 3) + checkFilterPredicate(Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3, + classOf[Operators.Or], Seq(Row(1), Row(4))) + } + } + test("filter pushdown - integer") { withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) From fbaf9e08961551d3ae5c3629eca01e839b001b8e Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 29 Jan 2015 15:44:53 -0800 Subject: [PATCH 04/20] [SPARK-5367][SQL] Support star expression in udf now spark sql does not support star expression in udf, run the following sql by spark-sql will get error ``` select concat(*) from src ``` Author: wangfei Author: scwf Closes #4163 from scwf/udf-star and squashes the following commits: 9db7b39 [wangfei] addressed comments da1da09 [scwf] minor fix f87b5f9 [scwf] added test case 587bf7e [wangfei] compile fix eb93c16 [wangfei] fix star resolve issue in udf --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++++++----- .../spark/sql/hive/execution/HiveQuerySuite.scala | 5 +++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f4cc234dc9cd..cefd70acf3931 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -250,6 +250,12 @@ class Analyzer(catalog: Catalog, Project( projectList.flatMap { case s: Star => s.expand(child.output, resolver) + case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) => + val expandedArgs = args.flatMap { + case s: Star => s.expand(child.output, resolver) + case o => o :: Nil + } + Alias(child = f.copy(children = expandedArgs), name)() :: Nil case o => o :: Nil }, child) @@ -273,10 +279,9 @@ class Analyzer(catalog: Catalog, case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") q transformExpressions { - case u @ UnresolvedAttribute(name) - if resolver(name, VirtualColumn.groupingIdName) && - q.isInstanceOf[GroupingAnalytics] => - // Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics + case u @ UnresolvedAttribute(name) if resolver(name, VirtualColumn.groupingIdName) && + q.isInstanceOf[GroupingAnalytics] => + // Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics q.asInstanceOf[GroupingAnalytics].gid case u @ UnresolvedAttribute(name) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. @@ -299,7 +304,7 @@ class Analyzer(catalog: Catalog, * Returns true if `exprs` contains a [[Star]]. */ protected def containsStar(exprs: Seq[Expression]): Boolean = - exprs.collect { case _: Star => true}.nonEmpty + exprs.exists(_.collect { case _: Star => true }.nonEmpty) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 42819e3584440..60619f5d99578 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -509,6 +509,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(sql("select key from src having key > 490").collect().size < 100) } + test("SPARK-5367: resolve star expression in udf") { + assert(sql("select concat(*) from src limit 5").collect().size == 5) + assert(sql("select array(*) from src limit 5").collect().size == 5) + } + test("Query Hive native command execution result") { val tableName = "test_native_commands" From c1b3eebf97b986439f71afd3c4eccf47b90da2cd Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 29 Jan 2015 15:47:13 -0800 Subject: [PATCH 05/20] [SPARK-5373][SQL] Literal in agg grouping expressions leads to incorrect result `select key, count( * ) from src group by key, 1` will get the wrong answer. e.g. for this table ``` val testData2 = TestSQLContext.sparkContext.parallelize( TestData2(1, 1) :: TestData2(1, 2) :: TestData2(2, 1) :: TestData2(2, 2) :: TestData2(3, 1) :: TestData2(3, 2) :: Nil, 2).toSchemaRDD testData2.registerTempTable("testData2") ``` result of `SELECT a, count(1) FROM testData2 GROUP BY a, 1` is ``` [1,1] [2,2] [3,1] ``` Author: wangfei Closes #4169 from scwf/agg-bug and squashes the following commits: 05751db [wangfei] fix bugs when literal in agg grouping expressioons --- .../apache/spark/sql/catalyst/planning/patterns.scala | 9 +++++---- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 +++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 310d127506d68..b4c445b3badf1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -141,10 +141,11 @@ object PartialAggregation { // We need to pass all grouping expressions though so the grouping can happen a second // time. However some of them might be unnamed so we alias them allowing them to be // referenced in the second aggregation. - val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map { - case n: NamedExpression => (n, n) - case other => (other, Alias(other, "PartialGroup")()) - }.toMap + val namedGroupingExpressions: Map[Expression, NamedExpression] = + groupingExpressions.filter(!_.isInstanceOf[Literal]).map { + case n: NamedExpression => (n, n) + case other => (other, Alias(other, "PartialGroup")()) + }.toMap // Replace aggregations with a new expression that computes the result from the already // computed partial evaluations and grouping values. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e03444d4969d7..d684278f11bcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -186,6 +186,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Seq(Row(1,3), Row(2,3), Row(3,3))) } + test("literal in agg grouping expressions") { + checkAnswer( + sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), + Seq(Row(1,2), Row(2,2), Row(3,2))) + checkAnswer( + sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), + Seq(Row(1,2), Row(2,2), Row(3,2))) + } + test("aggregates with nulls") { checkAnswer( sql("SELECT MIN(a), MAX(a), AVG(a), SUM(a), COUNT(a) FROM nullInts"), From c00d517d660ddc3c7b4302651e5567534a819905 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 29 Jan 2015 15:49:34 -0800 Subject: [PATCH 06/20] [SPARK-4296][SQL] Trims aliases when resolving and checking aggregate expressions I believe that SPARK-4296 has been fixed by 3684fd21e1ffdc0adaad8ff6b31394b637e866ce. I am adding tests based #3910 (change the udf to HiveUDF instead). Author: Yin Huai Author: Cheng Lian Closes #4010 from yhuai/SPARK-4296-yin and squashes the following commits: 6343800 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-4296-yin 6cfadd2 [Yin Huai] Actually, this issue has been fixed by 3684fd21e1ffdc0adaad8ff6b31394b637e866ce. d42b707 [Yin Huai] Update comment. 8b3a274 [Yin Huai] Since expressions in grouping expressions can have aliases, which can be used by the outer query block, revert this change. 443538d [Cheng Lian] Trims aliases when resolving and checking aggregate expressions --- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index faa7357b906c8..eb7a7750af02d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -267,4 +267,19 @@ class SQLQuerySuite extends QueryTest { sql("DROP TABLE nullValuesInInnerComplexTypes") dropTempTable("testTable") } + + test("SPARK-4296 Grouping field with Hive UDF as sub expression") { + val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil) + jsonRDD(rdd).registerTempTable("data") + checkAnswer( + sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"), + Row("str-1", 1970)) + + dropTempTable("data") + + jsonRDD(rdd).registerTempTable("data") + checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971)) + + dropTempTable("data") + } } From 0bb15f22d1694d3ac0476eb14142b1b1cc781690 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 29 Jan 2015 16:23:20 -0800 Subject: [PATCH 07/20] [SPARK-5464] Fix help() for Python DataFrame instances This fixes an exception that prevented users from calling `help()` on Python DataFrame instances. Author: Josh Rosen Closes #4278 from JoshRosen/SPARK-5464-python-dataframe-help-command and squashes the following commits: 08f95f7 [Josh Rosen] Fix exception when calling help() on Python DataFrame instances --- python/pyspark/sql.py | 6 +++--- python/pyspark/tests.py | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e636f992ec99b..3f2d7ac82585f 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -2136,9 +2136,9 @@ def __getitem__(self, item): def __getattr__(self, name): """ Return the column by given name """ - if isinstance(name, basestring): - return Column(self._jdf.apply(name)) - raise AttributeError + if name.startswith("__"): + raise AttributeError(name) + return Column(self._jdf.apply(name)) def alias(self, name): """ Alias the current DataFrame """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 081a77fbb0be2..bec1961f26393 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -23,6 +23,7 @@ from fileinput import input from glob import glob import os +import pydoc import re import shutil import subprocess @@ -1032,6 +1033,15 @@ def test_aggregator(self): from pyspark.sql import Aggregator as Agg # self.assertEqual((0, '100'), tuple(g.agg(Agg.first(df.key), Agg.last(df.value)).first())) + def test_help_command(self): + # Regression test for SPARK-5464 + rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) + df = self.sqlCtx.jsonRDD(rdd) + # render_doc() reproduces the help() exception without printing output + pydoc.render_doc(df) + pydoc.render_doc(df.foo) + pydoc.render_doc(df.take(1)) + class InputFormatTests(ReusedPySparkTestCase): From f240fe390b46b6e9859ce74108c5a5fba5c5f8b3 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 29 Jan 2015 16:31:19 -0800 Subject: [PATCH 08/20] [WIP] [SPARK-3996]: Shade Jetty in Spark deliverables This patch piggy-back's on vanzin's work to simplify the Guava shading, and adds Jetty as a shaded library in Spark. Other than adding Jetty, it consilidates the \'s into the root pom. I found it was a bit easier to follow that way, since you don't need to look into child pom's to find out specific artifact sets included in shading. Author: Patrick Wendell Closes #4252 from pwendell/jetty and squashes the following commits: 19f0710 [Patrick Wendell] More code review feedback 961452d [Patrick Wendell] Responding to feedback from Marcello 6df25ca [Patrick Wendell] [WIP] [SPARK-3996]: Shade Jetty in Spark deliverables --- bin/compute-classpath.sh | 4 +++- core/pom.xml | 22 ++++++++++++++++++++-- network/common/pom.xml | 12 ------------ pom.xml | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 9e8d0b785194e..a8c344b1ca594 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -50,8 +50,8 @@ fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\ "classes ahead of assembly." >&2 + # Spark classes CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes" @@ -63,6 +63,8 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes" + # Jars for shaded deps in their original form (copied here during build) + CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" fi # Use spark-assembly jar from either RELEASE or assembly directory diff --git a/core/pom.xml b/core/pom.xml index 31e919a1c831a..d91f4ee0241ac 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -94,22 +94,35 @@ org.apache.curator curator-recipes + + org.eclipse.jetty jetty-plus + compile org.eclipse.jetty jetty-security + compile org.eclipse.jetty jetty-util + compile org.eclipse.jetty jetty-server + compile + + org.eclipse.jetty + jetty-http + compile + + org.apache.commons commons-lang3 @@ -348,19 +361,24 @@ org.apache.maven.plugins maven-dependency-plugin + copy-dependencies package copy-dependencies - + ${project.build.directory} false false true true - guava + + guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server + true diff --git a/network/common/pom.xml b/network/common/pom.xml index 5a9bbe105d9f1..8f7c924d6b3a3 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -101,18 +101,6 @@ - - org.apache.maven.plugins - maven-shade-plugin - - false - - - com.google.guava:guava - - - - diff --git a/pom.xml b/pom.xml index 4adfdf3eb8702..63c0a2af9e021 100644 --- a/pom.xml +++ b/pom.xml @@ -337,25 +337,39 @@ + + + + org.eclipse.jetty + jetty-http + ${jetty.version} + provided + org.eclipse.jetty jetty-util ${jetty.version} + provided org.eclipse.jetty jetty-security ${jetty.version} + provided org.eclipse.jetty jetty-plus ${jetty.version} + provided org.eclipse.jetty jetty-server ${jetty.version} + provided com.google.guava @@ -363,6 +377,8 @@ 14.0.1 provided + + org.apache.commons commons-lang3 @@ -1276,10 +1292,26 @@ false + org.spark-project.spark:unused + + org.eclipse.jetty:jetty-io + org.eclipse.jetty:jetty-http + org.eclipse.jetty:jetty-plus + org.eclipse.jetty:jetty-security + org.eclipse.jetty:jetty-util + org.eclipse.jetty:jetty-server + com.google.guava:guava + + org.eclipse.jetty + org.spark-project.jetty + + org.eclipse.jetty.** + + com.google.common org.spark-project.guava From 5338772f3fe9cfe1f8caee64cce2275457d8f23f Mon Sep 17 00:00:00 2001 From: Yoshihiro Shimizu Date: Thu, 29 Jan 2015 16:55:00 -0800 Subject: [PATCH 09/20] remove 'return' looks unnecessary :grinning: Author: Yoshihiro Shimizu Closes #4268 from y-shimizu/remove-return and squashes the following commits: 12be0e9 [Yoshihiro Shimizu] remove 'return' --- .../src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 567a8a6c03d90..8f75e6f46e05d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -78,7 +78,7 @@ sealed trait Vector extends Serializable { result = 31 * result + (bits ^ (bits >>> 32)).toInt } } - return result + result } /** From d2071e8f45e74117f78a42770b0c610cb98e5075 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 29 Jan 2015 17:14:27 -0800 Subject: [PATCH 10/20] Revert "[WIP] [SPARK-3996]: Shade Jetty in Spark deliverables" This reverts commit f240fe390b46b6e9859ce74108c5a5fba5c5f8b3. --- bin/compute-classpath.sh | 4 +--- core/pom.xml | 22 ++-------------------- network/common/pom.xml | 12 ++++++++++++ pom.xml | 32 -------------------------------- 4 files changed, 15 insertions(+), 55 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index a8c344b1ca594..9e8d0b785194e 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -50,8 +50,8 @@ fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\ "classes ahead of assembly." >&2 - # Spark classes CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes" @@ -63,8 +63,6 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes" - # Jars for shaded deps in their original form (copied here during build) - CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" fi # Use spark-assembly jar from either RELEASE or assembly directory diff --git a/core/pom.xml b/core/pom.xml index d91f4ee0241ac..31e919a1c831a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -94,35 +94,22 @@ org.apache.curator curator-recipes - - org.eclipse.jetty jetty-plus - compile org.eclipse.jetty jetty-security - compile org.eclipse.jetty jetty-util - compile org.eclipse.jetty jetty-server - compile - - org.eclipse.jetty - jetty-http - compile - - org.apache.commons commons-lang3 @@ -361,24 +348,19 @@ org.apache.maven.plugins maven-dependency-plugin - copy-dependencies package copy-dependencies - + ${project.build.directory} false false true true - - guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server - + guava true diff --git a/network/common/pom.xml b/network/common/pom.xml index 8f7c924d6b3a3..5a9bbe105d9f1 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -101,6 +101,18 @@ + + org.apache.maven.plugins + maven-shade-plugin + + false + + + com.google.guava:guava + + + + diff --git a/pom.xml b/pom.xml index 63c0a2af9e021..4adfdf3eb8702 100644 --- a/pom.xml +++ b/pom.xml @@ -337,39 +337,25 @@ - - - - org.eclipse.jetty - jetty-http - ${jetty.version} - provided - org.eclipse.jetty jetty-util ${jetty.version} - provided org.eclipse.jetty jetty-security ${jetty.version} - provided org.eclipse.jetty jetty-plus ${jetty.version} - provided org.eclipse.jetty jetty-server ${jetty.version} - provided com.google.guava @@ -377,8 +363,6 @@ 14.0.1 provided - - org.apache.commons commons-lang3 @@ -1292,26 +1276,10 @@ false - org.spark-project.spark:unused - - org.eclipse.jetty:jetty-io - org.eclipse.jetty:jetty-http - org.eclipse.jetty:jetty-plus - org.eclipse.jetty:jetty-security - org.eclipse.jetty:jetty-util - org.eclipse.jetty:jetty-server - com.google.guava:guava - - org.eclipse.jetty - org.spark-project.jetty - - org.eclipse.jetty.** - - com.google.common org.spark-project.guava From ce9c43ba8ca1ba6507fd3bf3c647ab7396d33653 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 29 Jan 2015 17:24:00 -0800 Subject: [PATCH 11/20] [SQL] DataFrame API improvements 1. Added Dsl.column in case Dsl.col is shadowed. 2. Allow using String to specify the target data type in cast. 3. Support sorting on multiple columns using column names. 4. Added Java API test file. Author: Reynold Xin Closes #4280 from rxin/dsl1 and squashes the following commits: 33ecb7a [Reynold Xin] Add the Java test. d06540a [Reynold Xin] [SQL] DataFrame API improvements. --- .../scala/org/apache/spark/sql/Column.scala | 35 ++++- .../org/apache/spark/sql/DataFrame.scala | 26 +++- .../main/scala/org/apache/spark/sql/Dsl.scala | 6 + .../apache/spark/sql/GroupedDataFrame.scala | 27 +++- .../main/scala/org/apache/spark/sql/api.scala | 11 +- .../apache/spark/sql/api/java/JavaDsl.java | 120 ++++++++++++++++++ 6 files changed, 209 insertions(+), 16 deletions(-) create mode 100644 sql/core/src/test/java/org/apache/spark/sql/api/java/JavaDsl.java diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index ca50fd6f05867..68c9cb0c02018 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -56,7 +56,7 @@ object Column { class Column( sqlContext: Option[SQLContext], plan: Option[LogicalPlan], - val expr: Expression) + protected[sql] val expr: Expression) extends DataFrame(sqlContext, plan) with ExpressionApi { /** Turns a Catalyst expression into a `Column`. */ @@ -437,9 +437,7 @@ class Column( override def rlike(literal: String): Column = RLike(expr, lit(literal).expr) /** - * An expression that gets an - * @param ordinal - * @return + * An expression that gets an item at position `ordinal` out of an array. */ override def getItem(ordinal: Int): Column = GetItem(expr, Literal(ordinal)) @@ -490,11 +488,38 @@ class Column( * {{{ * // Casts colA to IntegerType. * import org.apache.spark.sql.types.IntegerType - * df.select(df("colA").as(IntegerType)) + * df.select(df("colA").cast(IntegerType)) + * + * // equivalent to + * df.select(df("colA").cast("int")) * }}} */ override def cast(to: DataType): Column = Cast(expr, to) + /** + * Casts the column to a different data type, using the canonical string representation + * of the type. The supported types are: `string`, `boolean`, `byte`, `short`, `int`, `long`, + * `float`, `double`, `decimal`, `date`, `timestamp`. + * {{{ + * // Casts colA to integer. + * df.select(df("colA").cast("int")) + * }}} + */ + override def cast(to: String): Column = Cast(expr, to.toLowerCase match { + case "string" => StringType + case "boolean" => BooleanType + case "byte" => ByteType + case "short" => ShortType + case "int" => IntegerType + case "long" => LongType + case "float" => FloatType + case "double" => DoubleType + case "decimal" => DecimalType.Unlimited + case "date" => DateType + case "timestamp" => TimestampType + case _ => throw new RuntimeException(s"""Unsupported cast type: "$to"""") + }) + override def desc: Column = SortOrder(expr, Descending) override def asc: Column = SortOrder(expr, Ascending) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 94c13a5c26678..1ff25adcf836a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -208,7 +208,7 @@ class DataFrame protected[sql]( } /** - * Returns a new [[DataFrame]] sorted by the specified column, in ascending column. + * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order. * {{{ * // The following 3 are equivalent * df.sort("sortcol") @@ -216,8 +216,9 @@ class DataFrame protected[sql]( * df.sort($"sortcol".asc) * }}} */ - override def sort(colName: String): DataFrame = { - Sort(Seq(SortOrder(apply(colName).expr, Ascending)), global = true, logicalPlan) + @scala.annotation.varargs + override def sort(sortCol: String, sortCols: String*): DataFrame = { + orderBy(apply(sortCol), sortCols.map(apply) :_*) } /** @@ -239,6 +240,15 @@ class DataFrame protected[sql]( Sort(sortOrder, global = true, logicalPlan) } + /** + * Returns a new [[DataFrame]] sorted by the given expressions. + * This is an alias of the `sort` function. + */ + @scala.annotation.varargs + override def orderBy(sortCol: String, sortCols: String*): DataFrame = { + sort(sortCol, sortCols :_*) + } + /** * Returns a new [[DataFrame]] sorted by the given expressions. * This is an alias of the `sort` function. @@ -401,6 +411,16 @@ class DataFrame protected[sql]( */ override def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs) + /** + * Aggregates on the entire [[DataFrame]] without groups. + * {{ + * // df.agg(...) is a shorthand for df.groupBy().agg(...) + * df.agg(Map("age" -> "max", "salary" -> "avg")) + * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) + * }} + */ + override def agg(exprs: java.util.Map[String, String]): DataFrame = agg(exprs.toMap) + /** * Aggregates on the entire [[DataFrame]] without groups. * {{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala index f47ff995e919b..75717e7cd842c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala @@ -62,6 +62,11 @@ object Dsl { */ def col(colName: String): Column = new Column(colName) + /** + * Returns a [[Column]] based on the given column name. Alias of [[col]]. + */ + def column(colName: String): Column = new Column(colName) + /** * Creates a [[Column]] of literal value. */ @@ -96,6 +101,7 @@ object Dsl { def sumDistinct(e: Column): Column = SumDistinct(e.expr) def count(e: Column): Column = Count(e.expr) + @scala.annotation.varargs def countDistinct(expr: Column, exprs: Column*): Column = CountDistinct((expr +: exprs).map(_.expr)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala index 1f1e9bd9899f6..1c948cbbfe58f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala @@ -58,7 +58,9 @@ class GroupedDataFrame protected[sql](df: DataFrame, groupingExprs: Seq[Expressi } /** - * Compute aggregates by specifying a map from column name to aggregate methods. + * Compute aggregates by specifying a map from column name to aggregate methods. The resulting + * [[DataFrame]] will also contain the grouping columns. + * * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. * {{{ * // Selects the age of the oldest employee and the aggregate expense for each department @@ -76,7 +78,9 @@ class GroupedDataFrame protected[sql](df: DataFrame, groupingExprs: Seq[Expressi } /** - * Compute aggregates by specifying a map from column name to aggregate methods. + * Compute aggregates by specifying a map from column name to aggregate methods. The resulting + * [[DataFrame]] will also contain the grouping columns. + * * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. * {{{ * // Selects the age of the oldest employee and the aggregate expense for each department @@ -91,12 +95,15 @@ class GroupedDataFrame protected[sql](df: DataFrame, groupingExprs: Seq[Expressi } /** - * Compute aggregates by specifying a series of aggregate columns. - * The available aggregate methods are defined in [[org.apache.spark.sql.dsl]]. + * Compute aggregates by specifying a series of aggregate columns. Unlike other methods in this + * class, the resulting [[DataFrame]] won't automatically include the grouping columns. + * + * The available aggregate methods are defined in [[org.apache.spark.sql.Dsl]]. + * * {{{ * // Selects the age of the oldest employee and the aggregate expense for each department * import org.apache.spark.sql.dsl._ - * df.groupBy("department").agg(max($"age"), sum($"expense")) + * df.groupBy("department").agg($"department", max($"age"), sum($"expense")) * }}} */ @scala.annotation.varargs @@ -109,31 +116,39 @@ class GroupedDataFrame protected[sql](df: DataFrame, groupingExprs: Seq[Expressi new DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) } - /** Count the number of rows for each group. */ + /** + * Count the number of rows for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ override def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")()) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. + * The resulting [[DataFrame]] will also contain the grouping columns. */ override def mean(): DataFrame = aggregateNumericColumns(Average) /** * Compute the max value for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. */ override def max(): DataFrame = aggregateNumericColumns(Max) /** * Compute the mean value for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. */ override def avg(): DataFrame = aggregateNumericColumns(Average) /** * Compute the min value for each numeric column for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. */ override def min(): DataFrame = aggregateNumericColumns(Min) /** * Compute the sum for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. */ override def sum(): DataFrame = aggregateNumericColumns(Sum) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 59634082f61c2..eb0eb3f32560c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -113,16 +113,22 @@ private[sql] trait DataFrameSpecificApi { def agg(exprs: Map[String, String]): DataFrame + def agg(exprs: java.util.Map[String, String]): DataFrame + @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame - def sort(colName: String): DataFrame + @scala.annotation.varargs + def sort(sortExpr: Column, sortExprs: Column*): DataFrame + + @scala.annotation.varargs + def sort(sortCol: String, sortCols: String*): DataFrame @scala.annotation.varargs def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame @scala.annotation.varargs - def sort(sortExpr: Column, sortExprs: Column*): DataFrame + def orderBy(sortCol: String, sortCols: String*): DataFrame def join(right: DataFrame): DataFrame @@ -257,6 +263,7 @@ private[sql] trait ExpressionApi { def getField(fieldName: String): Column def cast(to: DataType): Column + def cast(to: String): Column def asc: Column def desc: Column diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaDsl.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaDsl.java new file mode 100644 index 0000000000000..639436368c4a3 --- /dev/null +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaDsl.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java; + +import com.google.common.collect.ImmutableMap; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.types.DataTypes; + +import static org.apache.spark.sql.Dsl.*; + +/** + * This test doesn't actually run anything. It is here to check the API compatibility for Java. + */ +public class JavaDsl { + + public static void testDataFrame(final DataFrame df) { + DataFrame df1 = df.select("colA"); + df1 = df.select("colA", "colB"); + + df1 = df.select(col("colA"), col("colB"), lit("literal value").$plus(1)); + + df1 = df.filter(col("colA")); + + java.util.Map aggExprs = ImmutableMap.builder() + .put("colA", "sum") + .put("colB", "avg") + .build(); + + df1 = df.agg(aggExprs); + + df1 = df.groupBy("groupCol").agg(aggExprs); + + df1 = df.join(df1, col("key1").$eq$eq$eq(col("key2")), "outer"); + + df.orderBy("colA"); + df.orderBy("colA", "colB", "colC"); + df.orderBy(col("colA").desc()); + df.orderBy(col("colA").desc(), col("colB").asc()); + + df.sort("colA"); + df.sort("colA", "colB", "colC"); + df.sort(col("colA").desc()); + df.sort(col("colA").desc(), col("colB").asc()); + + df.as("b"); + + df.limit(5); + + df.unionAll(df1); + df.intersect(df1); + df.except(df1); + + df.sample(true, 0.1, 234); + + df.head(); + df.head(5); + df.first(); + df.count(); + } + + public static void testColumn(final Column c) { + c.asc(); + c.desc(); + + c.endsWith("abcd"); + c.startsWith("afgasdf"); + + c.like("asdf%"); + c.rlike("wef%asdf"); + + c.as("newcol"); + + c.cast("int"); + c.cast(DataTypes.IntegerType); + } + + public static void testDsl() { + // Creating a column. + Column c = col("abcd"); + Column c1 = column("abcd"); + + // Literals + Column l1 = lit(1); + Column l2 = lit(1.0); + Column l3 = lit("abcd"); + + // Functions + Column a = upper(c); + a = lower(c); + a = sqrt(c); + a = abs(c); + + // Aggregates + a = min(c); + a = max(c); + a = sum(c); + a = sumDistinct(c); + a = countDistinct(c, a); + a = avg(c); + a = first(c); + a = last(c); + } +} From 5c746eedda8cff2fc1692cf6dce376f4b0ca6fac Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 29 Jan 2015 17:28:37 -0800 Subject: [PATCH 12/20] [SPARK-5395] [PySpark] fix python process leak while coalesce() Currently, the Python process is released into pool only after the task had finished, it cause many process forked if coalesce() is called. This PR will change it to release the process as soon as read all the data from it (finish the partition), then a process could be reused to process multiple partitions in a single task. Author: Davies Liu Closes #4238 from davies/py_leak and squashes the following commits: ec80a43 [Davies Liu] add @volatile 6da437a [Davies Liu] address comments 24ed322 [Davies Liu] fix python process leak while coalesce() --- .../org/apache/spark/api/python/PythonRDD.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 4ac666c54fbcd..119e0459c5d1b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -67,17 +67,16 @@ private[spark] class PythonRDD( envVars += ("SPARK_REUSE_WORKER" -> "1") } val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + // Whether is the worker released into idle pool + @volatile var released = false // Start a thread to feed the process input from our parent's iterator val writerThread = new WriterThread(env, worker, split, context) - var complete_cleanly = false context.addTaskCompletionListener { context => writerThread.shutdownOnTaskCompletion() writerThread.join() - if (reuse_worker && complete_cleanly) { - env.releasePythonWorker(pythonExec, envVars.toMap, worker) - } else { + if (!reuse_worker || !released) { try { worker.close() } catch { @@ -145,8 +144,12 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } + // Check whether the worker is ready to be re-used. if (stream.readInt() == SpecialLengths.END_OF_STREAM) { - complete_cleanly = true + if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true + } } null } From 22271f969363fd139e6cfb5a2d95a2607fb4e572 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 29 Jan 2015 18:23:05 -0800 Subject: [PATCH 13/20] [SPARK-5462] [SQL] Use analyzed query plan in DataFrame.apply() This patch changes DataFrame's `apply()` method to use an analyzed query plan when resolving column names. This fixes a bug where `apply` would throw "invalid call to qualifiers on unresolved object" errors when called on DataFrames constructed via `SQLContext.sql()`. Author: Josh Rosen Closes #4282 from JoshRosen/SPARK-5462 and squashes the following commits: b9e6da2 [Josh Rosen] [SPARK-5462] Use analyzed query plan in DataFrame.apply(). --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 +++++--- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 1ff25adcf836a..2694e81eacf20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -111,14 +111,16 @@ class DataFrame protected[sql]( /** Returns the list of numeric columns, useful for doing aggregation. */ protected[sql] def numericColumns: Seq[Expression] = { schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n => - logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get + queryExecution.analyzed.resolve(n.name, sqlContext.analyzer.resolver).get } } /** Resolves a column name into a Catalyst [[NamedExpression]]. */ protected[sql] def resolve(colName: String): NamedExpression = { - logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(throw new RuntimeException( - s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")) + queryExecution.analyzed.resolve(colName, sqlContext.analyzer.resolver).getOrElse { + throw new RuntimeException( + s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") + } } /** Left here for compatibility reasons. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index db83a906d9648..df343adc793bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -276,5 +276,9 @@ class DataFrameSuite extends QueryTest { ) } + test("apply on query results (SPARK-5462)") { + val df = testData.sqlContext.sql("select key from testData") + checkAnswer(df("key"), testData.select('key).collect().toSeq) + } } From 80def9deb3bfc30d5b622b32aecb0322341a7f62 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 29 Jan 2015 19:09:08 -0800 Subject: [PATCH 14/20] [SQL] Support df("*") to select all columns in a data frame. This PR makes Star a trait, and provides two implementations: UnresolvedStar (used for *, tblName.*) and ResolvedStar (used for df("*")). Author: Reynold Xin Closes #4283 from rxin/df-star and squashes the following commits: c9cba3e [Reynold Xin] Removed mapFunction in UnresolvedStar. 1a3a1d7 [Reynold Xin] [SQL] Support df("*") to select all columns in a data frame. --- .../apache/spark/sql/catalyst/SqlParser.scala | 2 +- .../sql/catalyst/analysis/unresolved.scala | 53 +++++++++++++------ .../sql/catalyst/analysis/AnalysisSuite.scala | 4 +- .../scala/org/apache/spark/sql/Column.scala | 6 +-- .../org/apache/spark/sql/DataFrame.scala | 4 +- .../spark/sql/ColumnExpressionSuite.scala | 8 ++- .../org/apache/spark/sql/hive/HiveQl.scala | 6 +-- 7 files changed, 54 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index eaadbe9fd5099..24a65f8f4d379 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -348,7 +348,7 @@ class SqlParser extends AbstractSparkSQLParser { ) protected lazy val baseExpression: Parser[Expression] = - ( "*" ^^^ Star(None) + ( "*" ^^^ UnresolvedStar(None) | primary ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 71a738a0b2ca0..66060289189ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -50,7 +50,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo override def qualifiers = throw new UnresolvedException(this, "qualifiers") override lazy val resolved = false - override def newInstance = this + override def newInstance() = this override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this override def withName(newName: String) = UnresolvedAttribute(name) @@ -77,15 +77,10 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E /** * Represents all of the input attributes to a given relational operator, for example in - * "SELECT * FROM ...". - * - * @param table an optional table that should be the target of the expansion. If omitted all - * tables' columns are produced. + * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis. */ -case class Star( - table: Option[String], - mapFunction: Attribute => Expression = identity[Attribute]) - extends Attribute with trees.LeafNode[Expression] { +trait Star extends Attribute with trees.LeafNode[Expression] { + self: Product => override def name = throw new UnresolvedException(this, "name") override def exprId = throw new UnresolvedException(this, "exprId") @@ -94,29 +89,53 @@ case class Star( override def qualifiers = throw new UnresolvedException(this, "qualifiers") override lazy val resolved = false - override def newInstance = this + override def newInstance() = this override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this override def withName(newName: String) = this - def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = { + // Star gets expanded at runtime so we never evaluate a Star. + override def eval(input: Row = null): EvaluatedType = + throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") + + def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] +} + + +/** + * Represents all of the input attributes to a given relational operator, for example in + * "SELECT * FROM ...". + * + * @param table an optional table that should be the target of the expansion. If omitted all + * tables' columns are produced. + */ +case class UnresolvedStar(table: Option[String]) extends Star { + + override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = { val expandedAttributes: Seq[Attribute] = table match { // If there is no table specified, use all input attributes. case None => input // If there is a table, pick out attributes that are part of this table. case Some(t) => input.filter(_.qualifiers.filter(resolver(_, t)).nonEmpty) } - val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map { + expandedAttributes.zip(input).map { case (n: NamedExpression, _) => n case (e, originalAttribute) => Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers) } - mappedAttributes } - // Star gets expanded at runtime so we never evaluate a Star. - override def eval(input: Row = null): EvaluatedType = - throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") - override def toString = table.map(_ + ".").getOrElse("") + "*" } + + +/** + * Represents all the resolved input attributes to a given relational operator. This is used + * in the data frame DSL. + * + * @param expressions Expressions to expand. + */ +case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star { + override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = expressions + override def toString = expressions.mkString("ResolvedStar(", ", ", ")") +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 3aea337460d42..60060bf02913b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -51,7 +51,9 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { test("union project *") { val plan = (1 to 100) .map(_ => testRelation) - .fold[LogicalPlan](testRelation)((a,b) => a.select(Star(None)).select('a).unionAll(b.select(Star(None)))) + .fold[LogicalPlan](testRelation) { (a, b) => + a.select(UnresolvedStar(None)).select('a).unionAll(b.select(UnresolvedStar(None))) + } assert(caseInsensitiveAnalyze(plan).resolved) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 68c9cb0c02018..174c403059510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.language.implicitConversions import org.apache.spark.sql.Dsl.lit -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedStar, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} import org.apache.spark.sql.types._ @@ -71,8 +71,8 @@ class Column( * - "df.*" becomes an expression selecting all columns in data frame "df". */ def this(name: String) = this(name match { - case "*" => Star(None) - case _ if name.endsWith(".*") => Star(Some(name.substring(0, name.length - 2))) + case "*" => UnresolvedStar(None) + case _ if name.endsWith(".*") => UnresolvedStar(Some(name.substring(0, name.length - 2))) case _ => UnresolvedAttribute(name) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 2694e81eacf20..1096e396591df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -31,7 +31,7 @@ import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ @@ -265,7 +265,7 @@ class DataFrame protected[sql]( */ override def apply(colName: String): Column = colName match { case "*" => - Column("*") + new Column(ResolvedStar(schema.fieldNames.map(resolve))) case _ => val expr = resolve(colName) new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), expr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 6428554ec749d..2d464c2b53d79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -31,10 +31,14 @@ class ColumnExpressionSuite extends QueryTest { checkAnswer(testData.select($"*"), testData.collect().toSeq) } - ignore("star qualified by data frame object") { + test("star qualified by data frame object") { // This is not yet supported. val df = testData.toDataFrame - checkAnswer(df.select(df("*")), df.collect().toSeq) + val goldAnswer = df.collect().toSeq + checkAnswer(df.select(df("*")), goldAnswer) + + val df1 = df.select(df("*"), lit("abcd").as("litCol")) + checkAnswer(df1.select(df("*")), goldAnswer) } test("star qualified by table name") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 5e29e57d93585..399e58b259a45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1002,11 +1002,11 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } /* Stars (*) */ - case Token("TOK_ALLCOLREF", Nil) => Star(None) + case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None) // The format of dbName.tableName.* cannot be parsed by HiveParser. TOK_TABNAME will only // has a single child which is tableName. case Token("TOK_ALLCOLREF", Token("TOK_TABNAME", Token(name, Nil) :: Nil) :: Nil) => - Star(Some(name)) + UnresolvedStar(Some(name)) /* Aggregate Functions */ case Token("TOK_FUNCTION", Token(AVG(), Nil) :: arg :: Nil) => Average(nodeToExpr(arg)) @@ -1145,7 +1145,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_FUNCTION", Token(name, Nil) :: args) => UnresolvedFunction(name, args.map(nodeToExpr)) case Token("TOK_FUNCTIONSTAR", Token(name, Nil) :: args) => - UnresolvedFunction(name, Star(None) :: Nil) + UnresolvedFunction(name, UnresolvedStar(None) :: Nil) /* Literals */ case Token("TOK_NULL", Nil) => Literal(null, NullType) From dd4d84cf809e6e425958fe768c518679d1828779 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Jan 2015 21:26:29 -0800 Subject: [PATCH 15/20] [SPARK-5322] Added transpose functionality to BlockMatrix BlockMatrices can now be transposed! Author: Burak Yavuz Closes #4275 from brkyvz/SPARK-5322 and squashes the following commits: 33806ed [Burak Yavuz] added lazy comment 33e9219 [Burak Yavuz] made transpose lazy 5a274cd [Burak Yavuz] added cached tests 5dcf85c [Burak Yavuz] [SPARK-5322] Added transpose functionality to BlockMatrix --- .../linalg/distributed/BlockMatrix.scala | 9 ++++++ .../linalg/distributed/BlockMatrixSuite.scala | 29 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 426dbf4805d5f..693419f827379 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -232,6 +232,15 @@ class BlockMatrix( new DenseMatrix(m, n, values) } + /** Transpose this `BlockMatrix`. Returns a new `BlockMatrix` instance sharing the + * same underlying data. Is a lazy operation. */ + def transpose: BlockMatrix = { + val transposedBlocks = blocks.map { case ((blockRowIndex, blockColIndex), mat) => + ((blockColIndex, blockRowIndex), mat.transpose) + } + new BlockMatrix(transposedBlocks, colsPerBlock, rowsPerBlock, nCols, nRows) + } + /** Collects data and assembles a local dense breeze matrix (for test only). */ private[mllib] def toBreeze(): BDM[Double] = { val localMat = toLocalMatrix() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 7284d03d243f5..03f34308dd09b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -146,4 +146,33 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toLocalMatrix() === dense) assert(gridBasedMat.toBreeze() === expected) } + + test("transpose") { + val expected = BDM( + (1.0, 0.0, 3.0, 0.0, 0.0), + (0.0, 2.0, 1.0, 1.0, 0.0), + (0.0, 1.0, 1.0, 2.0, 1.0), + (0.0, 0.0, 0.0, 1.0, 5.0)) + + val AT = gridBasedMat.transpose + assert(AT.numRows() === gridBasedMat.numCols()) + assert(AT.numCols() === gridBasedMat.numRows()) + assert(AT.toBreeze() === expected) + + // partitioner must update as well + val originalPartitioner = gridBasedMat.partitioner + val ATpartitioner = AT.partitioner + assert(originalPartitioner.colsPerPart === ATpartitioner.rowsPerPart) + assert(originalPartitioner.rowsPerPart === ATpartitioner.colsPerPart) + assert(originalPartitioner.cols === ATpartitioner.rows) + assert(originalPartitioner.rows === ATpartitioner.cols) + + // make sure it works when matrices are cached as well + gridBasedMat.cache() + val AT2 = gridBasedMat.transpose + AT2.cache() + assert(AT2.toBreeze() === AT.toBreeze()) + val A = AT2.transpose + assert(A.toBreeze() === gridBasedMat.toBreeze()) + } } From bc1fc9b60dab69ae74419e35dc6bd263dc504f34 Mon Sep 17 00:00:00 2001 From: Kazuki Taniguchi Date: Fri, 30 Jan 2015 00:39:44 -0800 Subject: [PATCH 16/20] [SPARK-5094][MLlib] Add Python API for Gradient Boosted Trees This PR is implementing the Gradient Boosted Trees for Python API. Author: Kazuki Taniguchi Closes #3951 from kazk1018/gbt_for_py and squashes the following commits: 620d247 [Kazuki Taniguchi] [SPARK-5094][MLlib] Add Python API for Gradient Boosted Trees --- .../python/mllib/gradient_boosted_trees.py | 76 ++++++ .../mllib/api/python/PythonMLLibAPI.scala | 36 ++- python/pyspark/mllib/tests.py | 41 +++- python/pyspark/mllib/tree.py | 221 ++++++++++++++---- 4 files changed, 318 insertions(+), 56 deletions(-) create mode 100644 examples/src/main/python/mllib/gradient_boosted_trees.py diff --git a/examples/src/main/python/mllib/gradient_boosted_trees.py b/examples/src/main/python/mllib/gradient_boosted_trees.py new file mode 100644 index 0000000000000..e647773ad9060 --- /dev/null +++ b/examples/src/main/python/mllib/gradient_boosted_trees.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Gradient boosted Trees classification and regression using MLlib. +""" + +import sys + +from pyspark.context import SparkContext +from pyspark.mllib.tree import GradientBoostedTrees +from pyspark.mllib.util import MLUtils + + +def testClassification(trainingData, testData): + # Train a GradientBoostedTrees model. + # Empty categoricalFeaturesInfo indicates all features are continuous. + model = GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={}, + numIterations=30, maxDepth=4) + # Evaluate model on test instances and compute test error + predictions = model.predict(testData.map(lambda x: x.features)) + labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) + testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \ + / float(testData.count()) + print('Test Error = ' + str(testErr)) + print('Learned classification ensemble model:') + print(model.toDebugString()) + + +def testRegression(trainingData, testData): + # Train a GradientBoostedTrees model. + # Empty categoricalFeaturesInfo indicates all features are continuous. + model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo={}, + numIterations=30, maxDepth=4) + # Evaluate model on test instances and compute test error + predictions = model.predict(testData.map(lambda x: x.features)) + labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) + testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \ + / float(testData.count()) + print('Test Mean Squared Error = ' + str(testMSE)) + print('Learned regression ensemble model:') + print(model.toDebugString()) + + +if __name__ == "__main__": + if len(sys.argv) > 1: + print >> sys.stderr, "Usage: gradient_boosted_trees" + exit(1) + sc = SparkContext(appName="PythonGradientBoostedTrees") + + # Load and parse the data file into an RDD of LabeledPoint. + data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') + # Split the data into training and test sets (30% held out for testing) + (trainingData, testData) = data.randomSplit([0.7, 0.3]) + + print('\nRunning example of classification using GradientBoostedTrees\n') + testClassification(trainingData, testData) + + print('\nRunning example of regression using GradientBoostedTrees\n') + testRegression(trainingData, testData) + + sc.stop() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 430d763ef7ca7..a66d6f0cf29c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -41,10 +41,11 @@ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.correlation.CorrelationNames import org.apache.spark.mllib.stat.test.ChiSqTestResult -import org.apache.spark.mllib.tree.{RandomForest, DecisionTree} -import org.apache.spark.mllib.tree.configuration.{Algo, Strategy} +import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree} +import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy} import org.apache.spark.mllib.tree.impurity._ -import org.apache.spark.mllib.tree.model.{RandomForestModel, DecisionTreeModel} +import org.apache.spark.mllib.tree.loss.Losses +import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -532,6 +533,35 @@ class PythonMLLibAPI extends Serializable { } } + /** + * Java stub for Python mllib GradientBoostedTrees.train(). + * This stub returns a handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on exit; + * see the Py4J documentation. + */ + def trainGradientBoostedTreesModel( + data: JavaRDD[LabeledPoint], + algoStr: String, + categoricalFeaturesInfo: JMap[Int, Int], + lossStr: String, + numIterations: Int, + learningRate: Double, + maxDepth: Int): GradientBoostedTreesModel = { + val boostingStrategy = BoostingStrategy.defaultParams(algoStr) + boostingStrategy.setLoss(Losses.fromString(lossStr)) + boostingStrategy.setNumIterations(numIterations) + boostingStrategy.setLearningRate(learningRate) + boostingStrategy.treeStrategy.setMaxDepth(maxDepth) + boostingStrategy.treeStrategy.categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap + + val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK) + try { + GradientBoostedTrees.train(cached, boostingStrategy) + } finally { + cached.unpersist(blocking = false) + } + } + /** * Java stub for mllib Statistics.colStats(X: RDD[Vector]). * TODO figure out return type. diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index f48e3d6dacb4b..61e0cf5d90bd0 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -169,7 +169,7 @@ def test_kmeans_deterministic(self): def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes - from pyspark.mllib.tree import DecisionTree + from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees data = [ LabeledPoint(0.0, [1, 0, 0]), LabeledPoint(1.0, [0, 1, 1]), @@ -198,18 +198,31 @@ def test_classification(self): self.assertTrue(nb_model.predict(features[3]) > 0) categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories - dt_model = \ - DecisionTree.trainClassifier(rdd, numClasses=2, - categoricalFeaturesInfo=categoricalFeaturesInfo) + dt_model = DecisionTree.trainClassifier( + rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) self.assertTrue(dt_model.predict(features[3]) > 0) + rf_model = RandomForest.trainClassifier( + rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100) + self.assertTrue(rf_model.predict(features[0]) <= 0) + self.assertTrue(rf_model.predict(features[1]) > 0) + self.assertTrue(rf_model.predict(features[2]) <= 0) + self.assertTrue(rf_model.predict(features[3]) > 0) + + gbt_model = GradientBoostedTrees.trainClassifier( + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + self.assertTrue(gbt_model.predict(features[0]) <= 0) + self.assertTrue(gbt_model.predict(features[1]) > 0) + self.assertTrue(gbt_model.predict(features[2]) <= 0) + self.assertTrue(gbt_model.predict(features[3]) > 0) + def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ RidgeRegressionWithSGD - from pyspark.mllib.tree import DecisionTree + from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees data = [ LabeledPoint(-1.0, [0, -1]), LabeledPoint(1.0, [0, 1]), @@ -238,13 +251,27 @@ def test_regression(self): self.assertTrue(rr_model.predict(features[3]) > 0) categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories - dt_model = \ - DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + dt_model = DecisionTree.trainRegressor( + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) self.assertTrue(dt_model.predict(features[3]) > 0) + rf_model = RandomForest.trainRegressor( + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100) + self.assertTrue(rf_model.predict(features[0]) <= 0) + self.assertTrue(rf_model.predict(features[1]) > 0) + self.assertTrue(rf_model.predict(features[2]) <= 0) + self.assertTrue(rf_model.predict(features[3]) > 0) + + gbt_model = GradientBoostedTrees.trainRegressor( + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + self.assertTrue(gbt_model.predict(features[0]) <= 0) + self.assertTrue(gbt_model.predict(features[1]) > 0) + self.assertTrue(gbt_model.predict(features[2]) <= 0) + self.assertTrue(gbt_model.predict(features[3]) > 0) + class StatTests(PySparkTestCase): # SPARK-4023 diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index 66702478474dc..aae48f213246b 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -24,16 +24,48 @@ from pyspark.mllib.linalg import _convert_to_vector from pyspark.mllib.regression import LabeledPoint -__all__ = ['DecisionTreeModel', 'DecisionTree', 'RandomForestModel', 'RandomForest'] +__all__ = ['DecisionTreeModel', 'DecisionTree', 'RandomForestModel', + 'RandomForest', 'GradientBoostedTrees'] -class DecisionTreeModel(JavaModelWrapper): +class TreeEnsembleModel(JavaModelWrapper): + def predict(self, x): + """ + Predict values for a single data point or an RDD of points using + the model trained. + """ + if isinstance(x, RDD): + return self.call("predict", x.map(_convert_to_vector)) + + else: + return self.call("predict", _convert_to_vector(x)) + + def numTrees(self): + """ + Get number of trees in ensemble. + """ + return self.call("numTrees") + + def totalNumNodes(self): + """ + Get total number of nodes, summed over all trees in the ensemble. + """ + return self.call("totalNumNodes") + + def __repr__(self): + """ Summary of model """ + return self._java_model.toString() + + def toDebugString(self): + """ Full model """ + return self._java_model.toDebugString() + +class DecisionTreeModel(JavaModelWrapper): """ - A decision tree model for classification or regression. + .. note:: Experimental - EXPERIMENTAL: This is an experimental API. - It will probably be modified in future. + A decision tree model for classification or regression. """ def predict(self, x): """ @@ -64,12 +96,10 @@ def toDebugString(self): class DecisionTree(object): - """ - Learning algorithm for a decision tree model for classification or regression. + .. note:: Experimental - EXPERIMENTAL: This is an experimental API. - It will probably be modified in future. + Learning algorithm for a decision tree model for classification or regression. """ @classmethod @@ -186,51 +216,19 @@ def trainRegressor(cls, data, categoricalFeaturesInfo, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) -class RandomForestModel(JavaModelWrapper): +class RandomForestModel(TreeEnsembleModel): """ - Represents a random forest model. + .. note:: Experimental - EXPERIMENTAL: This is an experimental API. - It will probably be modified in future. + Represents a random forest model. """ - def predict(self, x): - """ - Predict values for a single data point or an RDD of points using - the model trained. - """ - if isinstance(x, RDD): - return self.call("predict", x.map(_convert_to_vector)) - - else: - return self.call("predict", _convert_to_vector(x)) - - def numTrees(self): - """ - Get number of trees in forest. - """ - return self.call("numTrees") - - def totalNumNodes(self): - """ - Get total number of nodes, summed over all trees in the forest. - """ - return self.call("totalNumNodes") - - def __repr__(self): - """ Summary of model """ - return self._java_model.toString() - - def toDebugString(self): - """ Full model """ - return self._java_model.toDebugString() class RandomForest(object): """ - Learning algorithm for a random forest model for classification or regression. + .. note:: Experimental - EXPERIMENTAL: This is an experimental API. - It will probably be modified in future. + Learning algorithm for a random forest model for classification or regression. """ supportedFeatureSubsetStrategies = ("auto", "all", "sqrt", "log2", "onethird") @@ -383,6 +381,137 @@ def trainRegressor(cls, data, categoricalFeaturesInfo, numTrees, featureSubsetSt featureSubsetStrategy, impurity, maxDepth, maxBins, seed) +class GradientBoostedTreesModel(TreeEnsembleModel): + """ + .. note:: Experimental + + Represents a gradient-boosted tree model. + """ + + +class GradientBoostedTrees(object): + """ + .. note:: Experimental + + Learning algorithm for a gradient boosted trees model for classification or regression. + """ + + @classmethod + def _train(cls, data, algo, categoricalFeaturesInfo, + loss, numIterations, learningRate, maxDepth): + first = data.first() + assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" + model = callMLlibFunc("trainGradientBoostedTreesModel", data, algo, categoricalFeaturesInfo, + loss, numIterations, learningRate, maxDepth) + return GradientBoostedTreesModel(model) + + @classmethod + def trainClassifier(cls, data, categoricalFeaturesInfo, + loss="logLoss", numIterations=100, learningRate=0.1, maxDepth=3): + """ + Method to train a gradient-boosted trees model for classification. + + :param data: Training dataset: RDD of LabeledPoint. Labels should take values {0, 1}. + :param categoricalFeaturesInfo: Map storing arity of categorical + features. E.g., an entry (n -> k) indicates that feature + n is categorical with k categories indexed from 0: + {0, 1, ..., k-1}. + :param loss: Loss function used for minimization during gradient boosting. + Supported: {"logLoss" (default), "leastSquaresError", "leastAbsoluteError"}. + :param numIterations: Number of iterations of boosting. + (default: 100) + :param learningRate: Learning rate for shrinking the contribution of each estimator. + The learning rate should be between in the interval (0, 1] + (default: 0.1) + :param maxDepth: Maximum depth of the tree. E.g., depth 0 means 1 + leaf node; depth 1 means 1 internal node + 2 leaf nodes. + (default: 3) + :return: GradientBoostedTreesModel that can be used for prediction + + Example usage: + + >>> from pyspark.mllib.regression import LabeledPoint + >>> from pyspark.mllib.tree import GradientBoostedTrees + >>> + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(0.0, [1.0]), + ... LabeledPoint(1.0, [2.0]), + ... LabeledPoint(1.0, [3.0]) + ... ] + >>> + >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}) + >>> model.numTrees() + 100 + >>> model.totalNumNodes() + 300 + >>> print model, # it already has newline + TreeEnsembleModel classifier with 100 trees + >>> model.predict([2.0]) + 1.0 + >>> model.predict([0.0]) + 0.0 + >>> rdd = sc.parallelize([[2.0], [0.0]]) + >>> model.predict(rdd).collect() + [1.0, 0.0] + """ + return cls._train(data, "classification", categoricalFeaturesInfo, + loss, numIterations, learningRate, maxDepth) + + @classmethod + def trainRegressor(cls, data, categoricalFeaturesInfo, + loss="leastSquaresError", numIterations=100, learningRate=0.1, maxDepth=3): + """ + Method to train a gradient-boosted trees model for regression. + + :param data: Training dataset: RDD of LabeledPoint. Labels are + real numbers. + :param categoricalFeaturesInfo: Map storing arity of categorical + features. E.g., an entry (n -> k) indicates that feature + n is categorical with k categories indexed from 0: + {0, 1, ..., k-1}. + :param loss: Loss function used for minimization during gradient boosting. + Supported: {"logLoss" (default), "leastSquaresError", "leastAbsoluteError"}. + :param numIterations: Number of iterations of boosting. + (default: 100) + :param learningRate: Learning rate for shrinking the contribution of each estimator. + The learning rate should be between in the interval (0, 1] + (default: 0.1) + :param maxDepth: Maximum depth of the tree. E.g., depth 0 means 1 + leaf node; depth 1 means 1 internal node + 2 leaf nodes. + (default: 3) + :return: GradientBoostedTreesModel that can be used for prediction + + Example usage: + + >>> from pyspark.mllib.regression import LabeledPoint + >>> from pyspark.mllib.tree import GradientBoostedTrees + >>> from pyspark.mllib.linalg import SparseVector + >>> + >>> sparse_data = [ + ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), + ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) + ... ] + >>> + >>> model = GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {}) + >>> model.numTrees() + 100 + >>> model.totalNumNodes() + 102 + >>> model.predict(SparseVector(2, {1: 1.0})) + 1.0 + >>> model.predict(SparseVector(2, {0: 1.0})) + 0.0 + >>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]]) + >>> model.predict(rdd).collect() + [1.0, 0.0] + """ + return cls._train(data, "regression", categoricalFeaturesInfo, + loss, numIterations, learningRate, maxDepth) + + def _test(): import doctest globs = globals().copy() From 6f21dce5f4619e1a5d07028e2a74dc36be0849b9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 30 Jan 2015 01:21:35 -0800 Subject: [PATCH 17/20] [SPARK-5457][SQL] Add missing DSL for ApproxCountDistinct. Author: Takuya UESHIN Closes #4250 from ueshin/issues/SPARK-5457 and squashes the following commits: 3c05e59 [Takuya UESHIN] Remove parameter to use default value of ApproxCountDistinct. faea19d [Takuya UESHIN] Use overload instead of default value for Java support. d1cca38 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-5457 663d43d [Takuya UESHIN] Add missing DSL for ApproxCountDistinct. --- sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala index 75717e7cd842c..3499956023d11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala @@ -105,6 +105,11 @@ object Dsl { def countDistinct(expr: Column, exprs: Column*): Column = CountDistinct((expr +: exprs).map(_.expr)) + def approxCountDistinct(e: Column): Column = + ApproxCountDistinct(e.expr) + def approxCountDistinct(e: Column, rsd: Double): Column = + ApproxCountDistinct(e.expr, rsd) + def avg(e: Column): Column = Average(e.expr) def first(e: Column): Column = First(e.expr) def last(e: Column): Column = Last(e.expr) From 254eaa4d350dafe19f1715e80eb816856a126c21 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 30 Jan 2015 11:31:54 -0600 Subject: [PATCH 18/20] SPARK-5393. Flood of util.RackResolver log messages after SPARK-1714 Previously I had tried to solve this with by adding a line in Spark's log4j-defaults.properties. The issue with the message in log4j-defaults.properties was that the log4j.properties packaged inside Hadoop was getting picked up instead. While it would be ideal to fix that as well, we still want to quiet this in situations where a user supplies their own custom log4j properties. Author: Sandy Ryza Closes #4192 from sryza/sandy-spark-5393 and squashes the following commits: 4d5dedc [Sandy Ryza] Only set log level if unset 46e07c5 [Sandy Ryza] SPARK-5393. Flood of util.RackResolver log messages after SPARK-1714 --- .../org/apache/spark/log4j-defaults.properties | 1 - .../scala/org/apache/spark/SparkContext.scala | 2 +- .../SparkContextSchedulerCreationSuite.scala | 2 +- .../spark/deploy/yarn/YarnAllocator.scala | 7 +++++++ .../deploy/yarn/YarnSparkHadoopUtil.scala | 4 ---- .../cluster/YarnClusterScheduler.scala | 18 +----------------- ...sterScheduler.scala => YarnScheduler.scala} | 12 ++++++++---- 7 files changed, 18 insertions(+), 28 deletions(-) rename yarn/src/main/scala/org/apache/spark/scheduler/cluster/{YarnClientClusterScheduler.scala => YarnScheduler.scala} (77%) diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties index c99a61f63ea2b..89eec7d4b7f61 100644 --- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties +++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties @@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO -log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4c4ee04cc515e..3c61c10820ba9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1986,7 +1986,7 @@ object SparkContext extends Logging { case "yarn-client" => val scheduler = try { val clazz = - Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 8ae4f243ec1ae..bbed8ddc6bafc 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -149,7 +149,7 @@ class SparkContextSchedulerCreationSuite } test("yarn-client") { - testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler") } def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d00f29665a58f..3849586c6111e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -60,6 +62,11 @@ private[yarn] class YarnAllocator( import YarnAllocator._ + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + // Visible for testing. val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]] diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4bff846123619..4e39c1d58011b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.yarn -import java.lang.{Boolean => JBoolean} import java.io.File -import java.util.{Collections, Set => JSet} import java.util.regex.Matcher import java.util.regex.Pattern -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.HashMap @@ -32,7 +29,6 @@ import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} -import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SecurityManager, SparkConf} diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index be55d26f1cf61..72ec4d6b34af6 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -17,33 +17,17 @@ package org.apache.spark.scheduler.cluster -import org.apache.hadoop.yarn.util.RackResolver - import org.apache.spark._ import org.apache.spark.deploy.yarn.ApplicationMaster -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils /** * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") - // Nothing else for now ... initialize application master : which needs a SparkContext to - // determine how to allocate. - // Note that only the first creation of a SparkContext influences (and ideally, there must be - // only one SparkContext, right ?). Subsequent creations are ignored since executors are already - // allocated by then. - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) - } - override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala similarity index 77% rename from yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala rename to yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala index 2fa24cc43325e..4ebf3af12b381 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala @@ -19,14 +19,18 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + import org.apache.spark._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils -/** - * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. - */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { From 54d95758fcbe29a9af0f59673ac0b8a8c72b778e Mon Sep 17 00:00:00 2001 From: "Joseph J.C. Tang" Date: Fri, 30 Jan 2015 10:07:26 -0800 Subject: [PATCH 19/20] [MLLIB] SPARK-4846: throw a RuntimeException and give users hints to increase the minCount When the vocabSize\*vectorSize is larger than Int.MaxValue/8, we try to throw a RuntimeException. Because under this circumstance it would definitely throw an OOM when allocating memory to serialize the arrays syn0Global&syn1Global. syn0Global&syn1Global are float arrays. Serializing them should need a byte array of more than 8 times of syn0Global's size. Also if we catch an OOM even if vocabSize\*vectorSize is less than Int.MaxValue/8, we should give users hints to increase the minCount or decrease the vectorSize. Author: Joseph J.C. Tang Closes #4247 from jinntrance/w2v-fix and squashes the following commits: b5eb71f [Joseph J.C. Tang] throw a RuntimeException and give users hints regarding the vectorSize&minCount --- .../scala/org/apache/spark/mllib/feature/Word2Vec.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index d25a7cd5b439d..a3e40200bc063 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -290,6 +290,13 @@ class Word2Vec extends Serializable with Logging { val newSentences = sentences.repartition(numPartitions).cache() val initRandom = new XORShiftRandom(seed) + + if (vocabSize.toLong * vectorSize * 8 >= Int.MaxValue) { + throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" + + " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " + + "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue/8`.") + } + val syn0Global = Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) val syn1Global = new Array[Float](vocabSize * vectorSize) From 0a95085f09754c7b883f29a2babb17209c6541bd Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 10:08:07 -0800 Subject: [PATCH 20/20] [SPARK-5496][MLLIB] Allow both classification and Classification in Algo for trees. to be backward compatible. Author: Xiangrui Meng Closes #4287 from mengxr/SPARK-5496 and squashes the following commits: a025c53 [Xiangrui Meng] Allow both classification and Classification in Algo for trees. --- .../org/apache/spark/mllib/tree/configuration/Algo.scala | 4 ++-- .../apache/spark/mllib/tree/GradientBoostedTreesSuite.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala index 0ef9c6181a0a0..b6099259971b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala @@ -29,8 +29,8 @@ object Algo extends Enumeration { val Classification, Regression = Value private[mllib] def fromString(name: String): Algo = name match { - case "classification" => Classification - case "regression" => Regression + case "classification" | "Classification" => Classification + case "regression" | "Regression" => Regression case _ => throw new IllegalArgumentException(s"Did not recognize Algo name: $name") } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index 3aa97e544680b..e8341a5d0d104 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -128,6 +128,11 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { } } + test("SPARK-5496: BoostingStrategy.defaultParams should recognize Classification") { + for (algo <- Seq("classification", "Classification", "regression", "Regression")) { + BoostingStrategy.defaultParams(algo) + } + } } object GradientBoostedTreesSuite {