Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-1161] Support read-write parquet conversion to read-write arrow (#…
Browse files Browse the repository at this point in the history
…1162)

* add ArrowConvertExtension

* do not convert parquet fileformat while writing to partitioned/bucketed/sorted output

* fix cache failed

* care about write codec

* disable convertor extension by default

* add some comments
  • Loading branch information
jackylee-ch authored Dec 6, 2022
1 parent be51864 commit 7e4b1d5
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.spark.sql

import java.util.Locale

import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

class ArrowConvertorExtension extends (SparkSessionExtensions => Unit) {
def apply(e: SparkSessionExtensions): Unit = {
e.injectPostHocResolutionRule(session => ArrowConvertorRule(session))
}
}

case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write datasource path
// TODO: support writing with partitioned/bucketed/sorted column
case c: InsertIntoHadoopFsRelationCommand
if c.fileFormat.isInstanceOf[ParquetFileFormat] &&
c.partitionColumns.isEmpty && c.bucketSpec.isEmpty =>
// TODO: Support pass parquet config and writing with other codecs
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
// `spark.sql.parquet.compression.codec`
// are in order of precedence from highest to lowest.
val parquetCompressionConf = c.options.get(ParquetOutputFormat.COMPRESSION)
val codecName = c.options
.get("compression")
.orElse(parquetCompressionConf)
.getOrElse(session.sessionState.conf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
if (codecName.equalsIgnoreCase("snappy")) {
c.copy(fileFormat = new ArrowFileFormat)
} else {
c
}

// Read path
case l@ LogicalRelation(
r@ HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _, _, _) =>
l.copy(relation = r.copy(fileFormat = new ArrowFileFormat)(session))

// INSERT DIR
case c: InsertIntoDataSourceDirCommand if c.provider == "parquet" =>
c.copy(provider = "arrow")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Logging wi
}

override def shortName(): String = "arrow"

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[ArrowFileFormat]
}

object ArrowFileFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import java.util.Objects

import scala.language.implicitConversions

import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME
import com.intel.oap.GazellePlugin.GAZELLE_WRITE_SESSION_EXTENSION_NAME
import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY
import com.intel.oap.extension.ColumnarOverrides
import com.intel.oap.GazellePlugin.{GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED, GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, GAZELLE_SESSION_EXTENSION_NAME, GAZELLE_WRITE_SESSION_EXTENSION_NAME, SPARK_SESSION_EXTS_KEY}
import com.intel.oap.extension.{OptimizerOverrides, StrategyOverrides}
import com.intel.oap.extension.ColumnarOverrides

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
Expand All @@ -36,7 +34,7 @@ import org.apache.spark.api.plugin.ExecutorPlugin
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.api.plugin.SparkPlugin
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.StaticSQLConf

class GazellePlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = {
Expand All @@ -58,12 +56,19 @@ private[oap] class GazelleDriverPlugin extends DriverPlugin {
def setPredefinedConfigs(conf: SparkConf): Unit = {
val extensions = conf.getOption(SPARK_SESSION_EXTS_KEY).getOrElse("")
if (extensions.contains(GAZELLE_SESSION_EXTENSION_NAME) ||
extensions.contains(GAZELLE_WRITE_SESSION_EXTENSION_NAME)) {
extensions.contains(GAZELLE_WRITE_SESSION_EXTENSION_NAME) ||
extensions.contains(GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME)) {
throw new IllegalArgumentException("Spark gazelle extensions are already specified before " +
"enabling Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SESSION_EXTS_KEY))
}
conf.set(SPARK_SESSION_EXTS_KEY,
s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME,$extensions")
if (conf.getBoolean(GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED, false)) {
conf.set(SPARK_SESSION_EXTS_KEY,
s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME," +
s"$GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME, $extensions")
} else {
conf.set(SPARK_SESSION_EXTS_KEY,
s"$GAZELLE_SESSION_EXTENSION_NAME,$GAZELLE_WRITE_SESSION_EXTENSION_NAME, $extensions")
}
}
}

Expand All @@ -81,7 +86,7 @@ private[oap] class SparkConfImplicits(conf: SparkConf) {
def enableGazellePlugin(): SparkConf = {
if (conf.contains(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) {
throw new IllegalArgumentException("A Spark plugin is already specified before enabling " +
"Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY))
"Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY))
}
conf.set(GazellePlugin.SPARK_SQL_PLUGINS_KEY, GazellePlugin.GAZELLE_PLUGIN_NAME)
}
Expand All @@ -100,12 +105,19 @@ private[oap] object GazellePlugin {
// To enable GazellePlugin in production, set "spark.plugins=com.intel.oap.GazellePlugin"
val SPARK_SQL_PLUGINS_KEY: String = "spark.plugins"
val GAZELLE_PLUGIN_NAME: String = Objects.requireNonNull(classOf[GazellePlugin]
.getCanonicalName)
.getCanonicalName)
val SPARK_SESSION_EXTS_KEY: String = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val GAZELLE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull(
classOf[GazelleSessionExtensions].getCanonicalName)
val GAZELLE_WRITE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull(
"com.intel.oap.spark.sql.ArrowWriteExtension")
val GAZELLE_CONVERTOR_SESSION_EXTENSION_NAME: String = Objects.requireNonNull(
"com.intel.oap.spark.sql.ArrowConvertorExtension")
// This configuration is used to enable/disable the convertor from parquet to arrow format.
// Enabling the converter extension may result in inconsistent behavior with vanilla spark
// in some cases, such as metadata file, struct type support, ignoreMissingFiles and so on.
// Thus this configuration is disabled by default.
val GAZELLE_CONVERTOR_SESSION_EXTENSION_ENABLED: String = "spark.oap.extension.convertor.enabled"
/**
* Specify all injectors that Gazelle is using in following list.
*/
Expand Down

0 comments on commit 7e4b1d5

Please sign in to comment.