Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Browse files Browse the repository at this point in the history
…alog
  • Loading branch information
Andrew Or committed Mar 23, 2016
2 parents 5ea8469 + 1a22cf1 commit 9519cd8
Show file tree
Hide file tree
Showing 21 changed files with 59 additions and 53 deletions.
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] class Benchmark(

val firstBest = results.head.bestMs
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getJVMOSInfo())
println(Benchmark.getProcessorName())
printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
Expand Down Expand Up @@ -91,16 +92,31 @@ private[spark] object Benchmark {
* This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz"
*/
def getProcessorName(): String = {
if (SystemUtils.IS_OS_MAC_OSX) {
val cpu = if (SystemUtils.IS_OS_MAC_OSX) {
Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string"))
} else if (SystemUtils.IS_OS_LINUX) {
Try {
val grepPath = Utils.executeAndGetOutput(Seq("which", "grep"))
Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo"))
.replaceFirst("model name[\\s*]:[\\s*]", "")
}.getOrElse("Unknown processor")
} else {
System.getenv("PROCESSOR_IDENTIFIER")
}
cpu
}

/**
* This should return a user helpful JVM & OS information.
* This should return something like
* "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64"
*/
def getJVMOSInfo(): String = {
val vmName = System.getProperty("java.vm.name")
val runtimeVersion = System.getProperty("java.runtime.version")
val osName = System.getProperty("os.name")
val osVersion = System.getProperty("os.version")
s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
}

/**
Expand Down
9 changes: 5 additions & 4 deletions sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ An interactive scala console can be invoked by running `build/sbt hive/console`.
From here you can execute queries with HiveQl and manipulate DataFrame by using DSL.

```scala
catalyst$ build/sbt hive/console
$ build/sbt hive/console

[info] Starting scala interpreter...
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -61,22 +61,23 @@ import org.apache.spark.sql.execution
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.types._
Type in expressions to have them evaluated.
Type :help for more information.

scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
query: org.apache.spark.sql.DataFrame = org.apache.spark.sql.DataFrame@74448eed
query: org.apache.spark.sql.DataFrame = [key: int, value: string]
```

Query results are `DataFrames` and can be operated as such.
```
scala> query.collect()
res2: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,val_311], [27,val_27]...
res0: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,val_311], [27,val_27]...
```

You can also build further queries on top of these `DataFrames` using the query DSL.
```
scala> query.where(query("key") > 30).select(avg(query("key"))).collect()
res3: Array[org.apache.spark.sql.Row] = Array([274.79025423728814])
res1: Array[org.apache.spark.sql.Row] = Array([274.79025423728814])
```
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ case class Round(child: Expression, scale: Expression)
if (f.isNaN || f.isInfinite) {
f
} else {
BigDecimal(f).setScale(_scale, HALF_UP).toFloat
BigDecimal(f.toDouble).setScale(_scale, HALF_UP).toFloat
}
case DoubleType =>
val d = input1.asInstanceOf[Double]
Expand Down Expand Up @@ -804,39 +804,21 @@ case class Round(child: Expression, scale: Expression)
s"${ev.value} = ${ce.value};"
}
case FloatType => // if child eval to NaN or Infinity, just return it.
if (_scale == 0) {
s"""
if (Float.isNaN(${ce.value}) || Float.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = Math.round(${ce.value});
}"""
} else {
s"""
if (Float.isNaN(${ce.value}) || Float.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = java.math.BigDecimal.valueOf(${ce.value}).
setScale(${_scale}, java.math.BigDecimal.ROUND_HALF_UP).floatValue();
}"""
}
s"""
if (Float.isNaN(${ce.value}) || Float.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = java.math.BigDecimal.valueOf(${ce.value}).
setScale(${_scale}, java.math.BigDecimal.ROUND_HALF_UP).floatValue();
}"""
case DoubleType => // if child eval to NaN or Infinity, just return it.
if (_scale == 0) {
s"""
if (Double.isNaN(${ce.value}) || Double.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = Math.round(${ce.value});
}"""
} else {
s"""
if (Double.isNaN(${ce.value}) || Double.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = java.math.BigDecimal.valueOf(${ce.value}).
setScale(${_scale}, java.math.BigDecimal.ROUND_HALF_UP).doubleValue();
}"""
}
s"""
if (Double.isNaN(${ce.value}) || Double.isInfinite(${ce.value})) {
${ev.value} = ${ce.value};
} else {
${ev.value} = java.math.BigDecimal.valueOf(${ce.value}).
setScale(${_scale}, java.math.BigDecimal.ROUND_HALF_UP).doubleValue();
}"""
}

if (scaleV == null) { // if scale is null, no need to eval its child at all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StructType


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,5 +553,9 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Round(Literal.create(null, dataType),
Literal.create(null, IntegerType)), null)
}

checkEvaluation(Round(-3.5, 0), -4.0)
checkEvaluation(Round(-0.35, 1), -0.4)
checkEvaluation(Round(-35, -1), -40)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* A collection of implicit methods for converting common Scala objects into [[DataFrame]]s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.command._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toCommentSafeString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfA
}

@Override
@SuppressWarnings(value="unchecked")
public AvroArrayOfArray build() {
try {
AvroArrayOfArray record = new AvroArrayOfArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroMapOfArr
}

@Override
@SuppressWarnings(value="unchecked")
public AvroMapOfArray build() {
try {
AvroMapOfArray record = new AvroMapOfArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroNonNulla
}

@Override
@SuppressWarnings(value="unchecked")
public AvroNonNullableArrays build() {
try {
AvroNonNullableArrays record = new AvroNonNullableArrays();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Build
}

@Override
@SuppressWarnings(value="unchecked")
public Nested build() {
try {
Nested record = new Nested();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroC
}

@Override
@SuppressWarnings(value="unchecked")
public ParquetAvroCompat build() {
try {
ParquetAvroCompat record = new ParquetAvroCompat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ private static <T> Set<T> toSet(List<T> records) {
}

@SafeVarargs
@SuppressWarnings("varargs")
private static <T> Set<T> asSet(T... records) {
return toSet(Arrays.asList(records));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.{LogicalRDD, Queryable}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf

abstract class QueryTest extends PlanTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ trait StreamTest extends QueryTest with Timeouts {

implicit class RichContinuousQuery(cq: ContinuousQuery) {
def stopQuietly(): Unit = quietly {
cq.stop()
try {
failAfter(10.seconds) {
cq.stop()
}
} catch {
case e: TestFailedDueToTimeoutException =>
logError(e.getMessage(), e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchange, ReuseExchange, ShuffleExchange}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions.{avg, broadcast, col, max}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.{File, FilenameFilter}
import java.io.File

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
Expand All @@ -28,7 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.{DataSourceScan, PhysicalRDD}
import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Benchmark, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.collection.CompactBuffer
Expand Down

0 comments on commit 9519cd8

Please sign in to comment.