diff --git a/assembly/pom.xml b/assembly/pom.xml index 594fa0c779e1b..1bb5a671f5390 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -43,12 +43,6 @@ - - - com.google.guava - guava - compile - org.apache.spark spark-core_${scala.binary.version} @@ -133,22 +127,6 @@ shade - - - com.google - org.spark-project.guava - - com.google.common.** - - - com/google/common/base/Absent* - com/google/common/base/Function - com/google/common/base/Optional* - com/google/common/base/Present* - com/google/common/base/Supplier - - - diff --git a/core/pom.xml b/core/pom.xml index 1984682b9c099..3c51b2d6b58f9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -106,16 +106,6 @@ org.eclipse.jetty jetty-server - - - com.google.guava - guava - compile - org.apache.commons commons-lang3 @@ -350,44 +340,6 @@ true - - org.apache.maven.plugins - maven-shade-plugin - - - package - - shade - - - false - - - com.google.guava:guava - - - - - - com.google.guava:guava - - com/google/common/base/Absent* - com/google/common/base/Function - com/google/common/base/Optional* - com/google/common/base/Present* - com/google/common/base/Supplier - - - - - - - - org.apache.maven.plugins maven-dependency-plugin diff --git a/examples/pom.xml b/examples/pom.xml index 4b92147725f6b..8caad2bc2e27a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -35,12 +35,6 @@ http://spark.apache.org/ - - - com.google.guava - guava - compile - org.apache.spark spark-core_${scala.binary.version} @@ -310,69 +304,40 @@ org.apache.maven.plugins maven-shade-plugin - - - package - - shade - - - false - ${project.build.directory}/scala-${scala.binary.version}/spark-examples-${project.version}-hadoop${hadoop.version}.jar - - - *:* - - - - - com.google.guava:guava - - - ** - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - com.google - org.spark-project.guava - - com.google.common.** - - - com.google.common.base.Optional** - - - - org.apache.commons.math3 - org.spark-project.commons.math3 - - - - - - reference.conf - - - log4j.properties - - - - - + + false + ${project.build.directory}/scala-${scala.binary.version}/spark-examples-${project.version}-hadoop${hadoop.version}.jar + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.commons.math3 + org.spark-project.commons.math3 + + + + + + reference.conf + + + log4j.properties + + + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala new file mode 100644 index 0000000000000..0ab74ba294535 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.{Logging, Partitioner} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +/** + * A grid partitioner, which uses a regular grid to partition coordinates. + * + * @param rows Number of rows. + * @param cols Number of columns. + * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge. + * @param colsPerPart Number of columns per partition, which may be less at the right edge. + */ +private[mllib] class GridPartitioner( + val rows: Int, + val cols: Int, + val rowsPerPart: Int, + val colsPerPart: Int) extends Partitioner { + + require(rows > 0) + require(cols > 0) + require(rowsPerPart > 0) + require(colsPerPart > 0) + + private val rowPartitions = math.ceil(rows / rowsPerPart).toInt + private val colPartitions = math.ceil(cols / colsPerPart).toInt + + override val numPartitions = rowPartitions * colPartitions + + /** + * Returns the index of the partition the input coordinate belongs to. + * + * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in + * multiplication. k is ignored in computing partitions. + * @return The index of the partition, which the coordinate belongs to. + */ + override def getPartition(key: Any): Int = { + key match { + case (i: Int, j: Int) => + getPartitionId(i, j) + case (i: Int, j: Int, _: Int) => + getPartitionId(i, j) + case _ => + throw new IllegalArgumentException(s"Unrecognized key: $key.") + } + } + + /** Partitions sub-matrices as blocks with neighboring sub-matrices. */ + private def getPartitionId(i: Int, j: Int): Int = { + require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") + require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") + i / rowsPerPart + j / colsPerPart * rowPartitions + } + + override def equals(obj: Any): Boolean = { + obj match { + case r: GridPartitioner => + (this.rows == r.rows) && (this.cols == r.cols) && + (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart) + case _ => + false + } + } +} + +private[mllib] object GridPartitioner { + + /** Creates a new [[GridPartitioner]] instance. */ + def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } + + /** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */ + def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { + require(suggestedNumPartitions > 0) + val scale = 1.0 / math.sqrt(suggestedNumPartitions) + val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt + val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } +} + +/** + * Represents a distributed matrix in blocks of local matrices. + * + * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form + * this distributed matrix. + * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final + * rows are not required to have the given number of rows + * @param colsPerBlock Number of columns that make up each block. The blocks forming the final + * columns are not required to have the given number of columns + * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero, + * the number of rows will be calculated when `numRows` is invoked. + * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to + * zero, the number of columns will be calculated when `numCols` is invoked. + */ +class BlockMatrix( + val blocks: RDD[((Int, Int), Matrix)], + val rowsPerBlock: Int, + val colsPerBlock: Int, + private var nRows: Long, + private var nCols: Long) extends DistributedMatrix with Logging { + + private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix) + + /** + * Alternate constructor for BlockMatrix without the input of the number of rows and columns. + * + * @param rdd The RDD of SubMatrices (local matrices) that form this matrix + * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final + * rows are not required to have the given number of rows + * @param colsPerBlock Number of columns that make up each block. The blocks forming the final + * columns are not required to have the given number of columns + */ + def this( + rdd: RDD[((Int, Int), Matrix)], + rowsPerBlock: Int, + colsPerBlock: Int) = { + this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L) + } + + override def numRows(): Long = { + if (nRows <= 0L) estimateDim() + nRows + } + + override def numCols(): Long = { + if (nCols <= 0L) estimateDim() + nCols + } + + val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt + val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt + + private[mllib] var partitioner: GridPartitioner = + GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) + + /** Estimates the dimensions of the matrix. */ + private def estimateDim(): Unit = { + val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => + (blockRowIndex.toLong * rowsPerBlock + mat.numRows, + blockColIndex.toLong * colsPerBlock + mat.numCols) + }.reduce { (x0, x1) => + (math.max(x0._1, x1._1), math.max(x0._2, x1._2)) + } + if (nRows <= 0L) nRows = rows + assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.") + if (nCols <= 0L) nCols = cols + assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") + } + + /** Caches the underlying RDD. */ + def cache(): this.type = { + blocks.cache() + this + } + + /** Persists the underlying RDD with the specified storage level. */ + def persist(storageLevel: StorageLevel): this.type = { + blocks.persist(storageLevel) + this + } + + /** Collect the distributed matrix on the driver as a `DenseMatrix`. */ + def toLocalMatrix(): Matrix = { + require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " + + s"Int.MaxValue. Currently numRows: ${numRows()}") + require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " + + s"Int.MaxValue. Currently numCols: ${numCols()}") + require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " + + s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}") + val m = numRows().toInt + val n = numCols().toInt + val mem = m * n / 125000 + if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") + + val localBlocks = blocks.collect() + val values = new Array[Double](m * n) + localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => + val rowOffset = blockRowIndex * rowsPerBlock + val colOffset = blockColIndex * colsPerBlock + submat.foreachActive { (i, j, v) => + val indexOffset = (j + colOffset) * m + rowOffset + i + values(indexOffset) = v + } + } + new DenseMatrix(m, n, values) + } + + /** Collects data and assembles a local dense breeze matrix (for test only). */ + private[mllib] def toBreeze(): BDM[Double] = { + val localMat = toLocalMatrix() + new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala new file mode 100644 index 0000000000000..05efbc8e8d0b8 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import scala.util.Random + +import breeze.linalg.{DenseMatrix => BDM} +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { + + val m = 5 + val n = 4 + val rowPerPart = 2 + val colPerPart = 2 + val numPartitions = 3 + var gridBasedMat: BlockMatrix = _ + + override def beforeAll() { + super.beforeAll() + + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + + gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart) + } + + test("size") { + assert(gridBasedMat.numRows() === m) + assert(gridBasedMat.numCols() === n) + } + + test("grid partitioner") { + val random = new Random() + // This should generate a 4x4 grid of 1x2 blocks. + val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12) + val expected0 = Array( + Array(0, 0, 4, 4, 8, 8, 12), + Array(1, 1, 5, 5, 9, 9, 13), + Array(2, 2, 6, 6, 10, 10, 14), + Array(3, 3, 7, 7, 11, 11, 15)) + for (i <- 0 until 4; j <- 0 until 7) { + assert(part0.getPartition((i, j)) === expected0(i)(j)) + assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((-1, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((4, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((0, -1)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((0, 7)) + } + + val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + val expected1 = Array( + Array(0, 2), + Array(1, 3)) + for (i <- 0 until 2; j <- 0 until 2) { + assert(part1.getPartition((i, j)) === expected1(i)(j)) + assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j)) + } + + val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + assert(part0 !== part2) + assert(part1 === part2) + + val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + val expected3 = Array( + Array(0, 0, 2), + Array(1, 1, 3)) + for (i <- 0 until 2; j <- 0 until 3) { + assert(part3.getPartition((i, j)) === expected3(i)(j)) + assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j)) + } + + val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + assert(part3 === part4) + + intercept[IllegalArgumentException] { + new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1) + } + + intercept[IllegalArgumentException] { + GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0) + } + + intercept[IllegalArgumentException] { + GridPartitioner(2, 2, suggestedNumPartitions = 0) + } + } + + test("toBreeze and toLocalMatrix") { + val expected = BDM( + (1.0, 0.0, 0.0, 0.0), + (0.0, 2.0, 1.0, 0.0), + (3.0, 1.0, 1.0, 0.0), + (0.0, 1.0, 2.0, 1.0), + (0.0, 0.0, 1.0, 5.0)) + + val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] + assert(gridBasedMat.toLocalMatrix() === dense) + assert(gridBasedMat.toBreeze() === expected) + } +} diff --git a/network/common/pom.xml b/network/common/pom.xml index 245a96b8c4038..5a9bbe105d9f1 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -48,10 +48,15 @@ slf4j-api provided + com.google.guava guava - provided + compile @@ -87,11 +92,6 @@ maven-jar-plugin 2.2 - - - test-jar - - test-jar-on-test-compile test-compile @@ -101,6 +101,18 @@ + + org.apache.maven.plugins + maven-shade-plugin + + false + + + com.google.guava:guava + + + + diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 5bfa1ac9c373e..c2d0300ecd904 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -52,7 +52,6 @@ com.google.guava guava - provided diff --git a/pom.xml b/pom.xml index 05cb3797fc55b..4adfdf3eb8702 100644 --- a/pom.xml +++ b/pom.xml @@ -1264,7 +1264,10 @@ - + org.apache.maven.plugins maven-shade-plugin @@ -1276,6 +1279,23 @@ org.spark-project.spark:unused + + + com.google.common + org.spark-project.guava + + + com/google/common/base/Absent* + com/google/common/base/Function + com/google/common/base/Optional* + com/google/common/base/Present* + com/google/common/base/Supplier + + + diff --git a/project/build.properties b/project/build.properties index 32a3aeefaf9fb..064ec843da9ea 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -sbt.version=0.13.6 +sbt.version=0.13.7 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 8df150e2f855f..73ec7a6d114f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -114,7 +114,6 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { } override def getString(i: Int): String = { - if (values(i) == null) sys.error("Failed to check null bit for primitive String value.") values(i).asInstanceOf[String] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 7fc8347428df4..7f20cf8d76797 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -252,7 +252,10 @@ class Column( /** * Equality test with an expression that is safe for null values. */ - override def <=> (other: Column): Column = EqualNullSafe(expr, other.expr) + override def <=> (other: Column): Column = other match { + case null => EqualNullSafe(expr, Literal.anyToLiteral(null).expr) + case _ => EqualNullSafe(expr, other.expr) + } /** * Equality test with a literal value that is safe for null values. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d0bb3640f8c1c..3198215b2c3ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -230,9 +230,12 @@ class DataFrame protected[sql]( /** * Selecting a single column and return it as a [[Column]]. */ - override def apply(colName: String): Column = { - val expr = resolve(colName) - new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), expr) + override def apply(colName: String): Column = colName match { + case "*" => + Column("*") + case _ => + val expr = resolve(colName) + new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), expr) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dsl/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/dsl/package.scala index 29c3d26ae56d9..4c44e178b9976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/dsl/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/dsl/package.scala @@ -53,6 +53,7 @@ package object dsl { def last(e: Column): Column = Last(e.expr) def min(e: Column): Column = Min(e.expr) def max(e: Column): Column = Max(e.expr) + def upper(e: Column): Column = Upper(e.expr) def lower(e: Column): Column = Lower(e.expr) def sqrt(e: Column): Column = Sqrt(e.expr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala new file mode 100644 index 0000000000000..825a1862ba6ff --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.dsl._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.types.{BooleanType, IntegerType, StructField, StructType} + + +class ColumnExpressionSuite extends QueryTest { + import org.apache.spark.sql.TestData._ + + // TODO: Add test cases for bitwise operations. + + test("star") { + checkAnswer(testData.select($"*"), testData.collect().toSeq) + } + + ignore("star qualified by data frame object") { + // This is not yet supported. + val df = testData.toDF + checkAnswer(df.select(df("*")), df.collect().toSeq) + } + + test("star qualified by table name") { + checkAnswer(testData.as("testData").select($"testData.*"), testData.collect().toSeq) + } + + test("+") { + checkAnswer( + testData2.select($"a" + 1), + testData2.collect().toSeq.map(r => Row(r.getInt(0) + 1))) + + checkAnswer( + testData2.select($"a" + $"b" + 2), + testData2.collect().toSeq.map(r => Row(r.getInt(0) + r.getInt(1) + 2))) + } + + test("-") { + checkAnswer( + testData2.select($"a" - 1), + testData2.collect().toSeq.map(r => Row(r.getInt(0) - 1))) + + checkAnswer( + testData2.select($"a" - $"b" - 2), + testData2.collect().toSeq.map(r => Row(r.getInt(0) - r.getInt(1) - 2))) + } + + test("*") { + checkAnswer( + testData2.select($"a" * 10), + testData2.collect().toSeq.map(r => Row(r.getInt(0) * 10))) + + checkAnswer( + testData2.select($"a" * $"b"), + testData2.collect().toSeq.map(r => Row(r.getInt(0) * r.getInt(1)))) + } + + test("/") { + checkAnswer( + testData2.select($"a" / 2), + testData2.collect().toSeq.map(r => Row(r.getInt(0).toDouble / 2))) + + checkAnswer( + testData2.select($"a" / $"b"), + testData2.collect().toSeq.map(r => Row(r.getInt(0).toDouble / r.getInt(1)))) + } + + + test("%") { + checkAnswer( + testData2.select($"a" % 2), + testData2.collect().toSeq.map(r => Row(r.getInt(0) % 2))) + + checkAnswer( + testData2.select($"a" % $"b"), + testData2.collect().toSeq.map(r => Row(r.getInt(0) % r.getInt(1)))) + } + + test("unary -") { + checkAnswer( + testData2.select(-$"a"), + testData2.collect().toSeq.map(r => Row(-r.getInt(0)))) + } + + test("unary !") { + checkAnswer( + complexData.select(!$"b"), + complexData.collect().toSeq.map(r => Row(!r.getBoolean(3)))) + } + + test("isNull") { + checkAnswer( + nullStrings.toDF.where($"s".isNull), + nullStrings.collect().toSeq.filter(r => r.getString(1) eq null)) + } + + test("isNotNull") { + checkAnswer( + nullStrings.toDF.where($"s".isNotNull), + nullStrings.collect().toSeq.filter(r => r.getString(1) ne null)) + } + + test("===") { + checkAnswer( + testData2.filter($"a" === 1), + testData2.collect().toSeq.filter(r => r.getInt(0) == 1)) + + checkAnswer( + testData2.filter($"a" === $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) == r.getInt(1))) + } + + test("<=>") { + checkAnswer( + testData2.filter($"a" === 1), + testData2.collect().toSeq.filter(r => r.getInt(0) == 1)) + + checkAnswer( + testData2.filter($"a" === $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) == r.getInt(1))) + } + + test("!==") { + val nullData = TestSQLContext.applySchema(TestSQLContext.sparkContext.parallelize( + Row(1, 1) :: + Row(1, 2) :: + Row(1, null) :: + Row(null, null) :: Nil), + StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType)))) + + checkAnswer( + nullData.filter($"b" <=> 1), + Row(1, 1) :: Nil) + + checkAnswer( + nullData.filter($"b" <=> null), + Row(1, null) :: Row(null, null) :: Nil) + + checkAnswer( + nullData.filter($"a" <=> $"b"), + Row(1, 1) :: Row(null, null) :: Nil) + } + + test(">") { + checkAnswer( + testData2.filter($"a" > 1), + testData2.collect().toSeq.filter(r => r.getInt(0) > 1)) + + checkAnswer( + testData2.filter($"a" > $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) > r.getInt(1))) + } + + test(">=") { + checkAnswer( + testData2.filter($"a" >= 1), + testData2.collect().toSeq.filter(r => r.getInt(0) >= 1)) + + checkAnswer( + testData2.filter($"a" >= $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) >= r.getInt(1))) + } + + test("<") { + checkAnswer( + testData2.filter($"a" < 2), + testData2.collect().toSeq.filter(r => r.getInt(0) < 2)) + + checkAnswer( + testData2.filter($"a" < $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) < r.getInt(1))) + } + + test("<=") { + checkAnswer( + testData2.filter($"a" <= 2), + testData2.collect().toSeq.filter(r => r.getInt(0) <= 2)) + + checkAnswer( + testData2.filter($"a" <= $"b"), + testData2.collect().toSeq.filter(r => r.getInt(0) <= r.getInt(1))) + } + + val booleanData = TestSQLContext.applySchema(TestSQLContext.sparkContext.parallelize( + Row(false, false) :: + Row(false, true) :: + Row(true, false) :: + Row(true, true) :: Nil), + StructType(Seq(StructField("a", BooleanType), StructField("b", BooleanType)))) + + test("&&") { + checkAnswer( + booleanData.filter($"a" && true), + Row(true, false) :: Row(true, true) :: Nil) + + checkAnswer( + booleanData.filter($"a" && false), + Nil) + + checkAnswer( + booleanData.filter($"a" && $"b"), + Row(true, true) :: Nil) + } + + test("||") { + checkAnswer( + booleanData.filter($"a" || true), + booleanData.collect()) + + checkAnswer( + booleanData.filter($"a" || false), + Row(true, false) :: Row(true, true) :: Nil) + + checkAnswer( + booleanData.filter($"a" || $"b"), + Row(false, true) :: Row(true, false) :: Row(true, true) :: Nil) + } + + test("sqrt") { + checkAnswer( + testData.select(sqrt('key)).orderBy('key.asc), + (1 to 100).map(n => Row(math.sqrt(n))) + ) + + checkAnswer( + testData.select(sqrt('value), 'key).orderBy('key.asc, 'value.asc), + (1 to 100).map(n => Row(math.sqrt(n), n)) + ) + + checkAnswer( + testData.select(sqrt(Literal(null))), + (1 to 100).map(_ => Row(null)) + ) + } + + test("abs") { + checkAnswer( + testData.select(abs('key)).orderBy('key.asc), + (1 to 100).map(n => Row(n)) + ) + + checkAnswer( + negativeData.select(abs('key)).orderBy('key.desc), + (1 to 100).map(n => Row(n)) + ) + + checkAnswer( + testData.select(abs(Literal(null))), + (1 to 100).map(_ => Row(null)) + ) + } + + test("upper") { + checkAnswer( + lowerCaseData.select(upper('l)), + ('a' to 'd').map(c => Row(c.toString.toUpperCase)) + ) + + checkAnswer( + testData.select(upper('value), 'key), + (1 to 100).map(n => Row(n.toString, n)) + ) + + checkAnswer( + testData.select(upper(Literal(null))), + (1 to 100).map(n => Row(null)) + ) + } + + test("lower") { + checkAnswer( + upperCaseData.select(lower('L)), + ('A' to 'F').map(c => Row(c.toString.toLowerCase)) + ) + + checkAnswer( + testData.select(lower('value), 'key), + (1 to 100).map(n => Row(n.toString, n)) + ) + + checkAnswer( + testData.select(lower(Literal(null))), + (1 to 100).map(n => Row(null)) + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala similarity index 82% rename from sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a5848f219cea9..6d7d5aa49358b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.test.TestSQLContext._ import scala.language.postfixOps -class DslQuerySuite extends QueryTest { +class DataFrameSuite extends QueryTest { import org.apache.spark.sql.TestData._ test("table scan") { @@ -276,71 +276,5 @@ class DslQuerySuite extends QueryTest { ) } - test("sqrt") { - checkAnswer( - testData.select(sqrt('key)).orderBy('key asc), - (1 to 100).map(n => Row(math.sqrt(n))) - ) - - checkAnswer( - testData.select(sqrt('value), 'key).orderBy('key asc, 'value asc), - (1 to 100).map(n => Row(math.sqrt(n), n)) - ) - - checkAnswer( - testData.select(sqrt(Literal(null))), - (1 to 100).map(_ => Row(null)) - ) - } - - test("abs") { - checkAnswer( - testData.select(abs('key)).orderBy('key asc), - (1 to 100).map(n => Row(n)) - ) - - checkAnswer( - negativeData.select(abs('key)).orderBy('key desc), - (1 to 100).map(n => Row(n)) - ) - - checkAnswer( - testData.select(abs(Literal(null))), - (1 to 100).map(_ => Row(null)) - ) - } - test("upper") { - checkAnswer( - lowerCaseData.select(upper('l)), - ('a' to 'd').map(c => Row(c.toString.toUpperCase)) - ) - - checkAnswer( - testData.select(upper('value), 'key), - (1 to 100).map(n => Row(n.toString, n)) - ) - - checkAnswer( - testData.select(upper(Literal(null))), - (1 to 100).map(n => Row(null)) - ) - } - - test("lower") { - checkAnswer( - upperCaseData.select(lower('L)), - ('A' to 'F').map(c => Row(c.toString.toLowerCase)) - ) - - checkAnswer( - testData.select(lower('value), 'key), - (1 to 100).map(n => Row(n.toString, n)) - ) - - checkAnswer( - testData.select(lower(Literal(null))), - (1 to 100).map(n => Row(null)) - ) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index fffa2b7dfa6e1..9eefe67c04434 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -161,7 +161,7 @@ object TestData { TestSQLContext.sparkContext.parallelize( NullStrings(1, "abc") :: NullStrings(2, "ABC") :: - NullStrings(3, null) :: Nil) + NullStrings(3, null) :: Nil).toDF nullStrings.registerTempTable("nullStrings") case class TableName(tableName: String) diff --git a/streaming/pom.xml b/streaming/pom.xml index 22b0d714b57f6..98f5b41de84a1 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -95,6 +95,14 @@ + + + org.apache.maven.plugins + maven-shade-plugin + + true + +