Skip to content

Commit

Permalink
[SPARK-14482][SQL] Change default compression codec for Parquet from …
Browse files Browse the repository at this point in the history
…gzip to snappy
  • Loading branch information
rxin committed Apr 8, 2016
1 parent 04fb7db commit 70ea5f8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}

private[sql] class CSVOptions(
@transient private val parameters: Map[String, String])
private[sql] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {

private def getChar(paramName: String, default: Char): Char = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the Parquet data source.
*/
class ParquetOptions(
@transient private val parameters: Map[String, String],
@transient private val sqlConf: SQLConf)
extends Logging with Serializable {

import ParquetOptions._

/**
* Compression codec to use. By default use the value specified in SQLConf.
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodec: String = {
val codecName = parameters.getOrElse("compression", sqlConf.parquetCompressionCodec).toLowerCase
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
shortParquetCompressionCodecNames(codecName).name()
}
}


object ParquetOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO)
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private[sql] class DefaultSource
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {

val parquetOptions = new ParquetOptions(options, sqlContext.sessionState.conf)

val conf = ContextUtil.getConfiguration(job)

val committerClass =
Expand All @@ -84,24 +86,11 @@ private[sql] class DefaultSource

if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
classOf[ParquetOutputCommitter].getCanonicalName)
} else {
logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
}

val compressionCodec: Option[String] = options
.get("compression")
.map { codecName =>
// Validate if given compression codec is supported or not.
val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
codecName.toLowerCase
}

conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
committerClass,
Expand Down Expand Up @@ -136,14 +125,7 @@ private[sql] class DefaultSource
sqlContext.conf.writeLegacyParquetFormat.toString)

// Sets compression scheme
conf.set(
ParquetOutputFormat.COMPRESSION,
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
compressionCodec
.getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
CompressionCodecName.UNCOMPRESSED).name())
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)

new OutputWriterFactory {
override def newInstance(
Expand Down Expand Up @@ -917,12 +899,4 @@ private[sql] object ParquetRelation extends Logging {
// should be removed after this issue is fixed.
}
}

// The parquet compression short names
val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO)
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ object SQLConf {
.stringConf
.transform(_.toLowerCase())
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("gzip")
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
Expand Down

0 comments on commit 70ea5f8

Please sign in to comment.