Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
update to the latest version
  • Loading branch information
JerryLead authored and JerryLead committed Dec 2, 2014
2 parents 52799e3 + b0a46d8 commit c0169da
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 28 deletions.
6 changes: 6 additions & 0 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,12 @@ for details.
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network. </td>
</tr>
<tr>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
</tr>
</table>

### Actions
Expand Down
48 changes: 41 additions & 7 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ describes the various methods for loading data into a SchemaRDD.

Spark SQL supports two different methods for converting existing RDDs into SchemaRDDs. The first
method uses reflection to infer the schema of an RDD that contains specific types of objects. This
reflection based approach leads to more concise code and works well when you already know the schema
reflection based approach leads to more concise code and works well when you already know the schema
while writing your Spark application.

The second method for creating SchemaRDDs is through a programmatic interface that allows you to
Expand Down Expand Up @@ -566,7 +566,7 @@ for teenName in teenNames.collect():

### Configuration

Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
`SET key=value` commands using SQL.

<table class="table">
Expand All @@ -575,8 +575,8 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
<td><code>spark.sql.parquet.binaryAsString</code></td>
<td>false</td>
<td>
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
</td>
</tr>
Expand All @@ -591,10 +591,20 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>gzip</td>
<td>
Sets the compression codec use when writing Parquet files. Acceptable values include:
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.filterPushdown</code></td>
<td>false</td>
<td>
Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known
bug in Paruet 1.6.0rc3 (<a href="https://issues.apache.org/jira/browse/PARQUET-136">PARQUET-136</a>).
However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn
this feature on.
</td>
</tr>
<tr>
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
<td>true</td>
Expand Down Expand Up @@ -945,7 +955,7 @@ options.

## Migration Guide for Shark User

### Scheduling
### Scheduling
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
users can set the `spark.sql.thriftserver.scheduler.pool` variable:

Expand Down Expand Up @@ -992,7 +1002,7 @@ Several caching related features are not supported yet:
## Compatibility with Apache Hive

Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark
SQL is based on Hive 0.12.0.
SQL is based on Hive 0.12.0 and 0.13.1.

#### Deploying in Existing Hive Warehouses

Expand Down Expand Up @@ -1031,6 +1041,7 @@ Spark SQL supports the vast majority of Hive features, such as:
* Sampling
* Explain
* Partitioned tables
* View
* All Hive DDL Functions, including:
* `CREATE TABLE`
* `CREATE TABLE AS SELECT`
Expand All @@ -1046,6 +1057,7 @@ Spark SQL supports the vast majority of Hive features, such as:
* `STRING`
* `BINARY`
* `TIMESTAMP`
* `DATE`
* `ARRAY<>`
* `MAP<>`
* `STRUCT<>`
Expand Down Expand Up @@ -1146,6 +1158,7 @@ evaluated by the SQL execution engine. A full list of the functions supported c
* Datetime type
- `TimestampType`: Represents values comprising values of fields year, month, day,
hour, minute, and second.
- `DateType`: Represents values comprising values of fields year, month, day.
* Complex types
- `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of
elements with the type of `elementType`. `containsNull` is used to indicate if
Expand Down Expand Up @@ -1253,6 +1266,13 @@ import org.apache.spark.sql._
TimestampType
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> java.sql.Date </td>
<td>
DateType
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> scala.collection.Seq </td>
Expand Down Expand Up @@ -1379,6 +1399,13 @@ please use factory methods provided in
DataType.TimestampType
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> java.sql.Date </td>
<td>
DataType.DateType
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> java.util.List </td>
Expand Down Expand Up @@ -1526,6 +1553,13 @@ from pyspark.sql import *
TimestampType()
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> datetime.date </td>
<td>
DateType()
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> list, tuple, or array </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ object HiveFromSpark {
val sc = new SparkContext(sparkConf)
val path = s"${System.getenv("SPARK_HOME")}/examples/src/main/resources/kv1.txt"

// A local hive context creates an instance of the Hive Metastore in process, storing
// the warehouse data in the current directory. This location can be overridden by
// specifying a second parameter to the constructor.
// A hive context adds support for finding tables in the MetaStore and writing queries
// using HiveQL. Users who do not have an existing Hive deployment can still create a
// HiveContext. When not configured by the hive-site.xml, the context automatically
// creates metastore_db and warehouse in the current directory.
val hiveContext = new HiveContext(sc)
import hiveContext._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {

/** Generate all variations of upper and lower case of a given string */
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
if (s.isEmpty) {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toLower) #:::
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class SqlParser extends AbstractSparkSQLParser {
| SUM ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => SumDistinct(exp) }
| COUNT ~ "(" ~> "*" <~ ")" ^^ { case _ => Count(Literal(1)) }
| COUNT ~ "(" ~> expression <~ ")" ^^ { case exp => Count(exp) }
| COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) }
| COUNT ~> "(" ~> DISTINCT ~> repsep(expression, ",") <~ ")" ^^
{ case exps => CountDistinct(exps) }
| APPROXIMATE ~ COUNT ~ "(" ~ DISTINCT ~> expression <~ ")" ^^
{ case exp => ApproxCountDistinct(exp) }
| APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
Expand Down Expand Up @@ -340,18 +341,13 @@ class SqlParser extends AbstractSparkSQLParser {
| floatLit ^^ { f => Literal(f.toDouble) }
)

private val longMax = BigDecimal(s"${Long.MaxValue}")
private val longMin = BigDecimal(s"${Long.MinValue}")
private val intMax = BigDecimal(s"${Int.MaxValue}")
private val intMin = BigDecimal(s"${Int.MinValue}")

private def toNarrowestIntegerType(value: String) = {
val bigIntValue = BigDecimal(value)

bigIntValue match {
case v if v < longMin || v > longMax => v
case v if v < intMin || v > intMax => v.toLong
case v => v.toInt
case v if bigIntValue.isValidInt => v.toIntExact
case v if bigIntValue.isValidLong => v.toLongExact
case v => v
}
}

Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class SchemaRDD(
* {{{
* schemaRDD.limit(10)
* }}}
*
* @group Query
*/
def limit(limitNum: Int): SchemaRDD =
new SchemaRDD(sqlContext, Limit(Literal(limitNum), logicalPlan))
Expand Down Expand Up @@ -355,6 +357,8 @@ class SchemaRDD(
* Return the number of elements in the RDD. Unlike the base RDD implementation of count, this
* implementation leverages the query optimizer to compute the count on the SchemaRDD, which
* supports features such as filter pushdown.
*
* @group Query
*/
@Experimental
override def count(): Long = aggregate(Count(Literal(1))).collect().head.getLong(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import scala.collection.JavaConversions._

/**
* Allows creation of parquet based tables using the syntax
* `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required
* is `path`, which should be the location of a collection of, optionally partitioned,
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends RelationProvider {
Expand All @@ -49,7 +49,7 @@ class DefaultSource extends RelationProvider {
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val path =
parameters.getOrElse("path", sys.error("'path' must be specifed for parquet tables."))
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))

ParquetRelation2(path)(sqlContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* CREATE FOREIGN TEMPORARY TABLE avroTable
* CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,4 +992,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
"nulldata2 on nulldata1.value <=> nulldata2.value"),
(1 to 2).map(i => Seq(i)))
}

test("Multi-column COUNT(DISTINCT ...)") {
val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), 2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private[hive] object HiveQl {
protected def nameExpressions(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
case (ne: NamedExpression, _) => ne
case (e, i) => Alias(e, s"c_$i")()
case (e, i) => Alias(e, s"_c$i")()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[hive] case class HiveGenericUdtf(
}

override protected def makeOutput() = {
// Use column names when given, otherwise c_1, c_2, ... c_n.
// Use column names when given, otherwise _c1, _c2, ... _cn.
if (aliasNames.size == outputDataTypes.size) {
aliasNames.zip(outputDataTypes).map {
case (attrName, attrDataType) =>
Expand All @@ -288,7 +288,7 @@ private[hive] case class HiveGenericUdtf(
} else {
outputDataTypes.zipWithIndex.map {
case (attrDataType, i) =>
AttributeReference(s"c_$i", attrDataType, nullable = true)()
AttributeReference(s"_c$i", attrDataType, nullable = true)()
}
}
}
Expand Down

0 comments on commit c0169da

Please sign in to comment.