Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-1161] Support read-write parquet conversion to read-write arrow #1162

Merged

Conversation

jackylee-ch
Copy link
Contributor

What changes were proposed in this pull request?

In our scenario, users usually read or write hive parquet tables, which different from arrow datasource tables, thus we need a conversion rule to convert ParquetFileFormat to ArrowFileFormat.

How was this patch tested?

unit tests and we have used it in our scenario.

@github-actions
Copy link

github-actions bot commented Nov 7, 2022

#1161

@PHILO-HE
Copy link
Collaborator

PHILO-HE commented Nov 7, 2022

It seems we already have an experimental feature to overwrite vanilla spark's ParquetFileFormat with an implementation extending ArrowFileFormat. Does it do the same thing as this patch?
See code link.

@jackylee-ch
Copy link
Contributor Author

It seems we already have an experimental feature to overwrite vanilla spark's ParquetFileFormat with an implementation extending ArrowFileFormat. Does it do the same thing as this patch? See code link.

Thanks for your attension. And yes, they all can convert parquet to arrow fileformat, except that ParquetFileFormat does not currently support writing and it is very easy to support.
The main reason for adding this pr is to optimize the way to use gazelle. User can use the gazelle by adding the configuration of spark.plugins rather than adding extra spark-arrow-datasource-parquet.jar and adding extraClassPath config to make sure it can cover the vanilla ParquetFileFormat.

@jackylee-ch
Copy link
Contributor Author

cc @zhouyuan . All suite tests have been passed.

@PHILO-HE
Copy link
Collaborator

Running the full UT got some failures. I extracted some and posted them here.

  • org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-15474 Write and read back non-empty schema with empty dataframe - parquet
    Unable to infer schema for Parquet. It must be specified manually.
  • org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-23271 empty RDD when saved should write a metadata only file - parquet
    0 did not equal 1
  • org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-22146 read files containing special characters using parquet
    Unable to infer schema for Parquet. It must be specified manually.
  • org.apache.spark.sql.FileBasedDataSourceSuite.Enabling/disabling ignoreMissingFiles using parquet
    Job aborted due to stage failure: Task 0 in stage 91.0 failed 1 times, most recent failure: Lost task 0.0 in stage 91.0 (TID 119) (sr595 executor driver): java.lang.RuntimeException: Failed to open local file '///tmp/spark-4c6451bd-b58e-46fa-8af8-b9f7de780267/first/part-00000-5cd4f602-3070-4ca7-8740-390e6849bc75-c000.parquet'
 at org.apache.arrow.dataset.jni.JniWrapper.inspectSchema(Native Method)

  • org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc
    Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown
  • org.apache.spark.sql.FileBasedDataSourceSuite.Spark native readers should respect spark.sql.caseSensitive - parquet
    Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
  • org.apache.spark.sql.SQLQuerySuite.SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly
    Results do not match for query
  • org.apache.spark.sql.SQLQuerySuite.SPARK-33593: Vector reader got incorrect data with binary partition value
    java.lang.IllegalArgumentException: Illegal character in path at index 149...
    org.apache.spark.sql.SQLQuerySuite.SPARK-34212 Parquet should read decimals correctlyExpected exception org.apache.spark.SparkException to be thrown, but no exception was thrown`
  • org.apache.spark.sql.StatisticsCollectionSuite.analyze column command - result verification
    did not equal CatalogColumnStat(Some(2),.....

@jackylee-ch
Copy link
Contributor Author

@PHILO-HE Thanks for double check, I will check these tests.

Copy link
Collaborator

@zhouyuan zhouyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.get("compression")
.orElse(parquetCompressionConf)
.getOrElse(session.sessionState.conf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compresssion is supported in this pr. Maybe merge this pr first, then I can remove these codes?
#1014

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

@zhouyuan zhouyuan merged commit 7e4b1d5 into oap-project:main Dec 6, 2022
zhouyuan pushed a commit to zhouyuan/native-sql-engine that referenced this pull request Dec 14, 2022
…ap-project#1162)

* add ArrowConvertExtension

* do not convert parquet fileformat while writing to partitioned/bucketed/sorted output

* fix cache failed

* care about write codec

* disable convertor extension by default

* add some comments
zhouyuan added a commit that referenced this pull request Dec 14, 2022
* [NSE-1170] Set correct row number in batch scan w/ partition columns (#1172)

* [NSE-1171] Throw RuntimeException when reading duplicate fields in case-insensitive mode (#1173)

* throw exception if one more columns matched in case insensitive mode

* add schema check in arrow v2

* bump h2/pgsql version (#1176)

* bump h2/pgsql version

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* ignore one failed test

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* [NSE-956] allow to write parquet with compression (#1014)

This patch adds support for writing parquet with compression

df.coalesce(1).write.format("arrow").option("parquet.compression","zstd").save(path)

Signed-off-by: Yuan Zhou yuan.zhou@intel.com

* [NSE-1161] Support read-write parquet conversion to read-write arrow (#1162)

* add ArrowConvertExtension

* do not convert parquet fileformat while writing to partitioned/bucketed/sorted output

* fix cache failed

* care about write codec

* disable convertor extension by default

* add some comments

* remove wrong compress type check (#1178)

Since the compresssion has been supported in #1014 . The extra compression check in ArrowConvertorExtension can be remove now.

* fix to use right arrow branch (#1179)


fix to use right arrow branch
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* [NSE-1171] Support merge parquet schema and read missing schema (#1175)

* Support merge parquet schema and read missing schema

* fix error

* optimize null vectors

* optimize code

* optimize code

* change code

* add schema merge suite tests

* add test for struct type

* to use 1.5 branch arrow

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
Signed-off-by: Yuan Zhou yuan.zhou@intel.com
Co-authored-by: Jacky Lee <lijunqing@baidu.com>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants