Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2179][SQL] Public API for DataTypes and Schema #1346

Closed
wants to merge 47 commits into from
Closed

[SPARK-2179][SQL] Public API for DataTypes and Schema #1346

wants to merge 47 commits into from

Conversation

yhuai
Copy link
Contributor

@yhuai yhuai commented Jul 9, 2014

The current PR contains the following changes:

  • Expose DataTypes in the sql package (internal details are private to sql).
  • Users can create Rows.
  • Introduce applySchema to create a SchemaRDD by applying a schema: StructType to an RDD[Row].
  • Add a function simpleString to every DataType. Also, the schema represented by a StructType can be visualized by printSchema.
  • ScalaReflection.typeOfObject provides a way to infer the Catalyst data type based on an object. Also, we can compose typeOfObject with some custom logics to form a new function to infer the data type (for different use cases).
  • JsonRDD has been refactored to use changes introduced by this PR.
  • Add a field containsNull to ArrayType. So, we can explicitly mark if an ArrayType can contain null values. The default value of containsNull is false.

New APIs are introduced in the sql package object and SQLContext. You can find the scaladoc at
sql package object and SQLContext.

An example of using applySchema is shown below.

import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val peopleSchemaRDD = sqlContext. applySchema(people, schema)
peopleSchemaRDD.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

peopleSchemaRDD.registerAsTable("people")
sqlContext.sql("select name from people").collect.foreach(println)

I will add new contents to the SQL programming guide later.

JIRA: https://issues.apache.org/jira/browse/SPARK-2179

* Expose `DataType`s in the sql package (internal details are private to sql).
* Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`,
* Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

*
* @group userf
*/
def createSchemaRDD[A](rdd: RDD[A], schema: StructType, constructRow: A => Row) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit: functional lang usually uses "make", moreover SparkContext already has a public makeRDD.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16474/

@ueshin
Copy link
Member

ueshin commented Jul 10, 2014

Hi, I'm wondering if MapType will have something like containsNull for ArrayType.

@yhuai
Copy link
Contributor Author

yhuai commented Jul 10, 2014

Hi @ueshin, @marmbrus and I discussed about it. We think it is not semantically clear what a null means when it appears in the key or value field (considering a null is used to indicate a missing data value). So, we decided that key and value in a MapType should not contain any null value and we will not introduce containsNull to MapType. Does it make sense?

@ueshin
Copy link
Member

ueshin commented Jul 10, 2014

@yhuai, I understand. Thank you for your reply.

@rxin
Copy link
Contributor

rxin commented Jul 10, 2014

@yhuai I haven't looked at the changes yet, but can you make sure the end API is usable in Java?

private[sql] type JvmType = String
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "string"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you at it, add a blank line to separate each class

@yhuai
Copy link
Contributor Author

yhuai commented Jul 10, 2014

Yeah, I will make sure new APIs are usable in Java and Python.

@SparkQA
Copy link

SparkQA commented Jul 10, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16528/consoleFull

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16527/

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1346:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class ArrayType(elementType: DataType) extends DataType {
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class MapType(keyType: DataType, valueType: DataType) extends DataType {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16528/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16547/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1346:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class ArrayType(elementType: DataType) extends DataType {
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class MapType(keyType: DataType, valueType: DataType) extends DataType {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16547/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16553/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1346:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class ArrayType(elementType: DataType) extends DataType {
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class MapType(keyType: DataType, valueType: DataType) extends DataType {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16553/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16572/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1346:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17344/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17372/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1346:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17372/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1346. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17374/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1346:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17374/consoleFull

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@yhuai
Copy link
Contributor Author

yhuai commented Jul 30, 2014

@chenghao-intel containsNull and valueContainsNull can be used for further optimization. For example, let's say we have an ArrayType column and the element type is IntegerType. If elements of those arrays do not have null values, we can use a primitive array internal. Since we will expose data types to users, we need to introduce these two booleans with this PR. It can be hard to add them once users start to use these APIs.

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1346. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17423/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1346:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17423/consoleFull

@marmbrus
Copy link
Contributor

Thanks for working on this! Merged to master.

@asfgit asfgit closed this in 7003c16 Jul 30, 2014
@chenghao-intel
Copy link
Contributor

Thank you @yhuai for the explanation.

@yhuai
Copy link
Contributor Author

yhuai commented Jul 30, 2014

@yhuai yhuai mentioned this pull request Jul 30, 2014
@yhuai yhuai deleted the dataTypeAndSchema branch July 31, 2014 21:11
* StructType(
* StructField("name", StringType, false) ::
* StructField("age", IntegerType, true) :: Nil)
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yhuai , why we need to define schema as a StructType, but not directly as a Seq[StructField]? i tried to build a Seq[StructField] from JDBC metadata in #1612 https://github.com/apache/spark/pull/1612/files#diff-3 (it followed the code of your JsonRDD :)

it seems we do not need this StructType anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the completeness of our data types, we need StructType (Seq[StructField] is not a data type). For example, if the type of a filed is a struct, we need to have a way to describe that the type of this field is a struct. Also, because a row is basically a struct value, it is natural to use StructType to represent a schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o, yep, StructType is needed, i mean
def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD
could be
def applySchema(rowRDD: RDD[Row], schema: Seq[StructField]): SchemaRDD

then we do not need to always use schema.fields.map(f => AttributeReference...)

we can direct schema.map(f => AttributeReference...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be crazy... but if StructType <: Seq[StructField] then we could pass in either StructType or Seq[StructField]. Should be possible to do this fairly easily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, i merged the change and used this API applySchema(rowRDD, appliedSchema) in #1612

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
The current PR contains the following changes:
* Expose `DataType`s in the sql package (internal details are private to sql).
* Users can create Rows.
* Introduce `applySchema` to create a `SchemaRDD` by applying a `schema: StructType` to an `RDD[Row]`.
* Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
* `ScalaReflection.typeOfObject` provides a way to infer the Catalyst data type based on an object. Also, we can compose `typeOfObject` with some custom logics to form a new function to infer the data type (for different use cases).
* `JsonRDD` has been refactored to use changes introduced by this PR.
* Add a field `containsNull` to `ArrayType`. So, we can explicitly mark if an `ArrayType` can contain null values. The default value of `containsNull` is `false`.

New APIs are introduced in the sql package object and SQLContext. You can find the scaladoc at
[sql package object](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.package) and [SQLContext](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext).

An example of using `applySchema` is shown below.
```scala
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val peopleSchemaRDD = sqlContext. applySchema(people, schema)
peopleSchemaRDD.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

peopleSchemaRDD.registerAsTable("people")
sqlContext.sql("select name from people").collect.foreach(println)
```

I will add new contents to the SQL programming guide later.

JIRA: https://issues.apache.org/jira/browse/SPARK-2179

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes apache#1346 from yhuai/dataTypeAndSchema and squashes the following commits:

1d45977 [Yin Huai] Clean up.
a6e08b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c712fbf [Yin Huai] Converts types of values based on defined schema.
4ceeb66 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e5f8df5 [Yin Huai] Scaladoc.
122d1e7 [Yin Huai] Address comments.
03bfd95 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2476ed0 [Yin Huai] Minor updates.
ab71f21 [Yin Huai] Format.
fc2bed1 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
bd40a33 [Yin Huai] Address comments.
991f860 [Yin Huai] Move "asJavaDataType" and "asScalaDataType" to DataTypeConversions.scala.
1cb35fe [Yin Huai] Add "valueContainsNull" to MapType.
3edb3ae [Yin Huai] Python doc.
692c0b9 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
1d93395 [Yin Huai] Python APIs.
246da96 [Yin Huai] Add java data type APIs to javadoc index.
1db9531 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
d48fc7b [Yin Huai] Minor updates.
33c4fec [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b9f3071 [Yin Huai] Java API for applySchema.
1c9f33c [Yin Huai] Java APIs for DataTypes and Row.
624765c [Yin Huai] Tests for applySchema.
aa92e84 [Yin Huai] Update data type tests.
8da1a17 [Yin Huai] Add Row.fromSeq.
9c99bc0 [Yin Huai] Several minor updates.
1d9c13a [Yin Huai] Update applySchema API.
85e9b51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e495e4e [Yin Huai] More comments.
42d47a3 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c3f4a02 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2e58dbd [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b8b7db4 [Yin Huai] 1. Move sql package object and package-info to sql-core. 2. Minor updates on APIs. 3. Update scala doc.
68525a2 [Yin Huai] Update JSON unit test.
3209108 [Yin Huai] Add unit tests.
dcaf22f [Yin Huai] Add a field containsNull to ArrayType to indicate if an array can contain null values or not. If an ArrayType is constructed by "ArrayType(elementType)" (the existing constructor), the value of containsNull is false.
9168b83 [Yin Huai] Update comments.
fc649d7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
eca7d04 [Yin Huai] Add two apply methods which will be used to extract StructField(s) from a StructType.
949d6bb [Yin Huai] When creating a SchemaRDD for a JSON dataset, users can apply an existing schema.
7a6a7e5 [Yin Huai] Fix bug introduced by the change made on SQLContext.inferSchema.
43a45e1 [Yin Huai] Remove sql.util.package introduced in a previous commit.
0266761 [Yin Huai] Format
03eec4c [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
90460ac [Yin Huai] Infer the Catalyst data type from an object and cast a data value to the expected type.
3fa0df5 [Yin Huai] Provide easier ways to construct a StructType.
16be3e5 [Yin Huai] This commit contains three changes: * Expose `DataType`s in the sql package (internal details are private to sql). * Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`, * Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants