Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2523] [SQL] Hadoop table scan bug fixing #1439

Closed

Conversation

chenghao-intel
Copy link
Contributor

In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions.

This is the follow up with:
#1408
#1390

I've run a micro benchmark in my local with 15000000 records totally, and got the result as below:

With This Patch Partition-Based Table Non-Partition-Based Table
No 1927 ms 1885 ms
Yes 1541 ms 1524 ms

It showed this patch will also improve the performance.

PS: the benchmark code is also attached. (thanks @liancheng )

package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveTableScanPrepare extends App {
  case class Record(key: String, value: String)

  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i")))

  import hiveContext._

  hql("SHOW TABLES")
  hql("DROP TABLE if exists part_scan_test")
  hql("DROP TABLE if exists scan_test")
  hql("DROP TABLE if exists records")
  rdd.registerAsTable("records")

  hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING) 
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)
  hql("""CREATE TABLE scan_test (key STRING, value STRING)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)

  for (part1 <- 2000 until 2001) {
    for (part2 <- 1 to 5) {
      hql(s"""from records 
                 | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2') 
                 | select key, value
               """.stripMargin)
      hql(s"""from records 
                 | insert into table scan_test select key, value
               """.stripMargin)
    }
  }
}

object HiveTableScanTest extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("SHOW TABLES")
  val part_scan_test = hql("select key, value from part_scan_test")
  val scan_test = hql("select key, value from scan_test")

  val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test))
  val r_scan_test = (0 to 5).map(i => benchmark(scan_test))
  println("Scanning Partition-Based Table")
  r_part_scan_test.foreach(printResult)
  println("Scanning Non-Partition-Based Table")
  r_scan_test.foreach(printResult)

  def printResult(result: (Long, Long)) {
    println(s"Duration: ${result._1} ms Result: ${result._2}")
  }

  def benchmark(srdd: SchemaRDD) = {
    val begin = System.currentTimeMillis()
    val result = srdd.count()
    val end = System.currentTimeMillis()
    ((end - begin), result)
  }
}

@chenghao-intel chenghao-intel changed the title [SPARK-2523] [SQL] Hadoop table scan [SPARK-2523] [SQL] Hadoop table scan bug fixing Jul 16, 2014
@SparkQA
Copy link

SparkQA commented Jul 16, 2014

QA tests have started for PR 1439. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16724/consoleFull

@chenghao-intel chenghao-intel changed the title [SPARK-2523] [SQL] Hadoop table scan bug fixing [SPARK-2523] [SQL] [WIP] Hadoop table scan bug fixing Jul 16, 2014
@chenghao-intel
Copy link
Contributor Author

ObjectInspector is not required by Row in Catalyst any more (not like in Shark), and it is tightly coupled with Deserializer & the raw data, so I moved the ObjectInspector into TableReader.

@SparkQA
Copy link

SparkQA commented Jul 16, 2014

QA results for PR 1439:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class HadoopTableReader(@transient attributes: Seq[Attribute],

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16724/consoleFull

@yhuai
Copy link
Contributor

yhuai commented Jul 16, 2014

Could you elaborate on when we will see an exception? Can you provide a case?

iter.map { value =>
val raw = deserializer.deserialize(value)
var idx = 0;
while(idx < fieldRefs.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after while

@chenghao-intel
Copy link
Contributor Author

@yhuai & @concretevitamin thanks for the comments, I've updated the description in Jira, can you please jump there and take a look?

@chenghao-intel
Copy link
Contributor Author

Sorry, forgot to paste the link. https://issues.apache.org/jira/browse/SPARK-2523

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA tests have started for PR 1439. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16766/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA results for PR 1439:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class HadoopTableReader(@transient attributes: Seq[Attribute],

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16766/consoleFull


private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}

private def addColumnMetadataToConf(hiveConf: HiveConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it. It is important to set needed columns in conf. So, RCFile and ORC can know what columns should be skipped. Also, seems hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) and hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) will be used to push down filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes, I didn't realize that. I will revert it.

@yhuai
Copy link
Contributor

yhuai commented Jul 17, 2014

I think we are not clear on the boundary between a TableReader and a physical TableScan operator (e.g. HiveTableScan). Seems we just want TableReader to create RDDs (general-purpose work) and inside a TableScan operator, we create Catalyst Rows (table-specific work). However, when we look at HadoopTableReader, it is actually a HiveTableReader. For every Hive partition, we create a HadoopRDD (requiring Hive-specific code) and deserialize Hive rows. I am not sure if TableReader is a good abstraction.

I think it makes sense to remove the trait of TableReader and add a abstract TableScan class (inheriting LeafNode). All existing TableScan operators will inherit this abstract TableScan class. If we think it is the right approach. I can do it in another PR.

@marmbrus, @liancheng, @rxin, @chenghao-intel thoughts?

@yhuai
Copy link
Contributor

yhuai commented Jul 17, 2014

@chenghao-intel explained the root cause in https://issues.apache.org/jira/browse/SPARK-2523. Basically, we should use partition-specific ObjectInspectors to extract fields instead of using the ObjectInspectors set in the TableDesc. If partitions of a table are using different SerDe, their ObjectInspectors will be different. @chenghao-intel can you add unit tests? Is there any Hive query test that can be included?

@chenghao-intel
Copy link
Contributor Author

@yhuai I agree with you we should make a clear boundary between HiveTableScan and TableReader, but I am not sure if it's a good idea to create multiple HiveTableScan classes instead of one. Routing to different table scan operators may requires exposing more details in the SparkPlanner, which sits inside of the HiveTableScan currently.
Perhaps making multiple TableReader is more reasonable, for example, TableReader, PartitionReader, MemoryTableReader etc.

@yhuai
Copy link
Contributor

yhuai commented Jul 18, 2014

@chenghao-intel I did not meant to introduce multiple HiveTableScan. I meant to have a abstract TableScan and make existing ones (e.g. HiveTableScan and ParquetTableScan) be subclasses of the abstract TableScan.

@chenghao-intel
Copy link
Contributor Author

@yhuai I got your mean eventually, I think you're right, some of the logic could be shared among TableScan operators.

@marmbrus
Copy link
Contributor

I'll just add the the HiveTableReader vs HiveTableScan separation is purely artificial, and the split is based on what code was stolen from Shark vs what code was written for Spark SQL. It would be reasonable to combine them at some point. However, for this PR it would be great to just fix the bug at hand.

If we are going to do major refactoring I'd want to see benchmarks showing that we aren't introducing any performance regressions.

It would also be nice to see a test case that would be currently failing but passes after this PR is added.

@liancheng
Copy link
Contributor

As for benchmarks, the micro benchmark code comes with #758 may be helpful. And I feel that partitioning support for Parquet should be considered together with the refactoring @yhuai suggested.

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1439. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16896/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1439. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16897/consoleFull

@chenghao-intel
Copy link
Contributor Author

Thank you guys, I've updated the code as suggested, and the also provide the micro-benchmark result in the PR description.

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1439:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16896/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1439:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class HadoopTableReader(@transient attributes: Seq[Attribute],

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16897/consoleFull

@chenghao-intel
Copy link
Contributor Author

@yhuai @concretevitamin @marmbrus @liancheng Can you take a look of this? I think the test result gave us more confidence for the improvement.

@@ -114,6 +77,7 @@ case class HiveTableScan(
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")

if (attributes.size == relation.output.size) {
// TODO what if duplicated attributes queried?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A HiveTableScan will be only created by SQLContext#pruneFilterProject and pruneFilterProject will make sure there is no duplicated attributes. Let's add a comment to explain it.


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.{BeforeAndAfterAll, FunSuite}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai
Copy link
Contributor

yhuai commented Jul 26, 2014

@chenghao-intel Sorry for my late reply. Other than those minor comment and format issues, it looks good to me.

@yhuai
Copy link
Contributor

yhuai commented Jul 26, 2014

Also, can you delete [WIP] from the PR title and update it to match the JIRA title?

""".stripMargin)
TestHive.hql("""from src insert into table part_scan_test PARTITION (ds='2010-01-02')
| select 200,200 limit 1
""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's make all SQL keywords capital here.

@chenghao-intel chenghao-intel changed the title [SPARK-2523] [SQL] [WIP] Hadoop table scan bug fixing [SPARK-2523] [SQL] Hadoop table scan bug fixing Jul 28, 2014
@chenghao-intel
Copy link
Contributor Author

Thank you @yhuai , @liancheng ,I've updated the code according to your comments, let's see if SparkQA happy with the changes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1439. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17269/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA results for PR 1439:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class HadoopTableReader(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17269/consoleFull

@yhuai
Copy link
Contributor

yhuai commented Jul 28, 2014

LGTM

@marmbrus
Copy link
Contributor

Thanks! I've merged this into master.

@chenghao-intel
Copy link
Contributor Author

@marmbrus seems this PR is still open. Can you double check if it's merged?

@marmbrus
Copy link
Contributor

Apache mirroring is broken. We've filled a bug with infra.
On Jul 28, 2014 6:50 PM, "Cheng Hao" notifications@github.com wrote:

@marmbrus https://github.com/marmbrus seems this PR is still open. Can
you double check if it's merged?


Reply to this email directly or view it on GitHub
#1439 (comment).

@asfgit asfgit closed this in 2b8d89e Jul 29, 2014
@chenghao-intel chenghao-intel deleted the hadoop_table_scan branch July 30, 2014 02:14
@yhuai
Copy link
Contributor

yhuai commented Jul 30, 2014

I am looking at it.

asfgit pushed a commit that referenced this pull request Jul 31, 2014
…maven test)

This PR tries to resolve the broken Jenkins maven test issue introduced by #1439. Now, we create a single query test to run both the setup work and the test query.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1669 from yhuai/SPARK-2523-fixTest and squashes the following commits:

358af1a [Yin Huai] Make partition_based_table_scan_with_different_serde run atomically.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions.

This is the follow up with:
apache#1408
apache#1390

I've run a micro benchmark in my local with 15000000 records totally, and got the result as below:

With This Patch  |  Partition-Based Table  |  Non-Partition-Based Table
------------ | ------------- | -------------
No  |  1927 ms  |  1885 ms
Yes  | 1541 ms  |  1524 ms

It showed this patch will also improve the performance.

PS:  the benchmark code is also attached. (thanks liancheng )
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveTableScanPrepare extends App {
  case class Record(key: String, value: String)

  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i")))

  import hiveContext._

  hql("SHOW TABLES")
  hql("DROP TABLE if exists part_scan_test")
  hql("DROP TABLE if exists scan_test")
  hql("DROP TABLE if exists records")
  rdd.registerAsTable("records")

  hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  hql("""CREATE TABLE scan_test (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  for (part1 <- 2000 until 2001) {
    for (part2 <- 1 to 5) {
      hql(s"""from records
                 | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2')
                 | select key, value
               """.stripMargin)
      hql(s"""from records
                 | insert into table scan_test select key, value
               """.stripMargin)
    }
  }
}

object HiveTableScanTest extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("SHOW TABLES")
  val part_scan_test = hql("select key, value from part_scan_test")
  val scan_test = hql("select key, value from scan_test")

  val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test))
  val r_scan_test = (0 to 5).map(i => benchmark(scan_test))
  println("Scanning Partition-Based Table")
  r_part_scan_test.foreach(printResult)
  println("Scanning Non-Partition-Based Table")
  r_scan_test.foreach(printResult)

  def printResult(result: (Long, Long)) {
    println(s"Duration: ${result._1} ms Result: ${result._2}")
  }

  def benchmark(srdd: SchemaRDD) = {
    val begin = System.currentTimeMillis()
    val result = srdd.count()
    val end = System.currentTimeMillis()
    ((end - begin), result)
  }
}
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1439 from chenghao-intel/hadoop_table_scan and squashes the following commits:

888968f [Cheng Hao] Fix issues in code style
27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs
40a24a7 [Cheng Hao] Add Unit Test
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…maven test)

This PR tries to resolve the broken Jenkins maven test issue introduced by apache#1439. Now, we create a single query test to run both the setup work and the test query.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes apache#1669 from yhuai/SPARK-2523-fixTest and squashes the following commits:

358af1a [Yin Huai] Make partition_based_table_scan_with_different_serde run atomically.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants