From 2f0b882e5c8787b09bedcc8208e6dcc5662dbbab Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 8 Apr 2016 23:52:04 -0700 Subject: [PATCH] [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy ## What changes were proposed in this pull request? Based on our tests, gzip decompression is very slow (< 100MB/s), making queries decompression bound. Snappy can decompress at ~ 500MB/s on a single core. This patch changes the default compression codec for Parquet output from gzip to snappy, and also introduces a ParquetOptions class to be more consistent with other data sources (e.g. CSV, JSON). ## How was this patch tested? Should be covered by existing unit tests. Author: Reynold Xin Closes #12256 from rxin/SPARK-14482. --- .../datasources/csv/CSVOptions.scala | 3 +- .../datasources/parquet/ParquetOptions.scala | 59 +++++++++++++++++++ .../datasources/parquet/ParquetRelation.scala | 34 ++--------- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 4 files changed, 65 insertions(+), 33 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 95de02cf5c182..7b9d3b605a891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -22,8 +22,7 @@ import java.nio.charset.StandardCharsets import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} -private[sql] class CSVOptions( - @transient private val parameters: Map[String, String]) +private[sql] class CSVOptions(@transient private val parameters: Map[String, String]) extends Logging with Serializable { private def getChar(paramName: String, default: Char): Char = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala new file mode 100644 index 0000000000000..00352f23ae660 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.hadoop.metadata.CompressionCodecName + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf + +/** + * Options for the Parquet data source. + */ +class ParquetOptions( + @transient private val parameters: Map[String, String], + @transient private val sqlConf: SQLConf) + extends Logging with Serializable { + + import ParquetOptions._ + + /** + * Compression codec to use. By default use the value specified in SQLConf. + * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. + */ + val compressionCodec: String = { + val codecName = parameters.getOrElse("compression", sqlConf.parquetCompressionCodec).toLowerCase + if (!shortParquetCompressionCodecNames.contains(codecName)) { + val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + shortParquetCompressionCodecNames(codecName).name() + } +} + + +object ParquetOptions { + // The parquet compression short names + private val shortParquetCompressionCodecNames = Map( + "none" -> CompressionCodecName.UNCOMPRESSED, + "uncompressed" -> CompressionCodecName.UNCOMPRESSED, + "snappy" -> CompressionCodecName.SNAPPY, + "gzip" -> CompressionCodecName.GZIP, + "lzo" -> CompressionCodecName.LZO) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5ad95e4b9eac6..ca6803b73770a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -74,6 +74,8 @@ private[sql] class DefaultSource options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + val parquetOptions = new ParquetOptions(options, sqlContext.sessionState.conf) + val conf = ContextUtil.getConfiguration(job) val committerClass = @@ -84,24 +86,11 @@ private[sql] class DefaultSource if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { logInfo("Using default output committer for Parquet: " + - classOf[ParquetOutputCommitter].getCanonicalName) + classOf[ParquetOutputCommitter].getCanonicalName) } else { logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) } - val compressionCodec: Option[String] = options - .get("compression") - .map { codecName => - // Validate if given compression codec is supported or not. - val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames - if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) { - val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") - } - codecName.toLowerCase - } - conf.setClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, committerClass, @@ -136,14 +125,7 @@ private[sql] class DefaultSource sqlContext.conf.writeLegacyParquetFormat.toString) // Sets compression scheme - conf.set( - ParquetOutputFormat.COMPRESSION, - ParquetRelation - .shortParquetCompressionCodecNames - .getOrElse( - compressionCodec - .getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase), - CompressionCodecName.UNCOMPRESSED).name()) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) new OutputWriterFactory { override def newInstance( @@ -917,12 +899,4 @@ private[sql] object ParquetRelation extends Logging { // should be removed after this issue is fixed. } } - - // The parquet compression short names - val shortParquetCompressionCodecNames = Map( - "none" -> CompressionCodecName.UNCOMPRESSED, - "uncompressed" -> CompressionCodecName.UNCOMPRESSED, - "snappy" -> CompressionCodecName.SNAPPY, - "gzip" -> CompressionCodecName.GZIP, - "lzo" -> CompressionCodecName.LZO) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc6ba1bcfb6de..b58f960897ce8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -193,7 +193,7 @@ object SQLConf { .stringConf .transform(_.toLowerCase()) .checkValues(Set("uncompressed", "snappy", "gzip", "lzo")) - .createWithDefault("gzip") + .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown") .doc("Enables Parquet filter push-down optimization when set to true.")