Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option() #11464

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)

@since(1.4)
def json(self, path, mode=None):
def json(self, path, mode=None, compression=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -464,18 +464,19 @@ def json(self, path, mode=None):
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.

You can set the following JSON-specific option(s) for writing JSON files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.json(path)
self.mode(mode)
if compression is not None:
self.option("compression", compression)
self._jwrite.json(path)

@since(1.4)
def parquet(self, path, mode=None, partitionBy=None):
def parquet(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -486,32 +487,37 @@ def parquet(self, path, mode=None, partitionBy=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, gzip, and lzo).
This will overwrite ``spark.sql.parquet.compression.codec``.

>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if compression is not None:
self.option("compression", compression)
self._jwrite.parquet(path)

@since(1.6)
def text(self, path):
def text(self, path, compression=None):
"""Saves the content of the DataFrame in a text file at the specified path.

:param path: the path in any Hadoop supported file system
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.

You can set the following option(s) for writing text files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
"""
if compression is not None:
self.option("compression", compression)
self._jwrite.text(path)

@since(2.0)
def csv(self, path, mode=None):
def csv(self, path, mode=None, compression=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -522,17 +528,19 @@ def csv(self, path, mode=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.

You can set the following CSV-specific option(s) for writing CSV files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.csv(path)
self.mode(mode)
if compression is not None:
self.option("compression", compression)
self._jwrite.csv(path)

@since(1.5)
def orc(self, path, mode=None, partitionBy=None):
def orc(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.

::Note: Currently ORC support is only available together with
Expand All @@ -546,13 +554,18 @@ def orc(self, path, mode=None, partitionBy=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
This will overwrite ``orc.compress``.

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if compression is not None:
self.option("compression", compression)
self._jwrite.orc(path)

@since(1.4)
Expand Down
19 changes: 16 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following JSON-specific option(s) for writing JSON files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 1.4.0
*/
Expand All @@ -468,6 +469,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("parquet").save(path)
* }}}
*
* You can set the following Parquet-specific option(s) for writing Parquet files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `gzip`, and `lzo`).
* This will overwrite `spark.sql.parquet.compression.codec`. </li>
*
* @since 1.4.0
*/
def parquet(path: String): Unit = format("parquet").save(path)
Expand All @@ -479,6 +485,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("orc").save(path)
* }}}
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
* This will overwrite `orc.compress`. </li>
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
Expand All @@ -498,7 +509,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following option(s) for writing text files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 1.6.0
*/
Expand All @@ -513,7 +525,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following CSV-specific option(s) for writing CSV files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.util.Utils

private[datasources] object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
Expand All @@ -39,7 +41,9 @@ private[datasources] object CompressionCodecs {
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
if (codecName != null) {
Utils.classForName(codecName)
}
codecName
} catch {
case e: ClassNotFoundException =>
Expand All @@ -53,10 +57,16 @@ private[datasources] object CompressionCodecs {
* `codec` should be a full class path
*/
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
if (codec != null){
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ private[sql] class ParquetRelation(
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])

private val compressionCodec: Option[String] = parameters
.get("compression")
.map { codecName =>
// Validate if given compression codec is supported or not.
val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
codecName.toLowerCase
}

private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
Expand Down Expand Up @@ -286,7 +299,8 @@ private[sql] class ParquetRelation(
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toLowerCase(),
compressionCodec
.getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
CompressionCodecName.UNCOMPRESSED).name())

new BucketedOutputWriterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import java.io.File
import java.nio.charset.UnsupportedCharsetException
import java.sql.Timestamp

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -396,6 +402,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("SPARK-13543 Set explicitly the output as uncompressed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly does this test do? it's not super clear here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it to test whether uncompressed mode would work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, which one would be better?
Should I just write like write the output as uncompressed via option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - i think that's better

val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val csvDir = new File(dir, "csv").getCanonicalPath
val cars = sqlContext.read
.format("csv")
.option("header", "true")
.load(testFile(carsFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("compression", "none")
.save(csvDir)

val compressedFiles = new File(csvDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val carsCopy = sqlContext.read
.format("csv")
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("Schema inference correctly identifies the datatype when data is sparse.") {
val df = sqlContext.read
.format("csv")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.scalactic.Tolerance._

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -1524,6 +1525,49 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("SPARK-13543 Set explicitly the output as uncompressed") {
val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val dir = Utils.createTempDir()
dir.delete()

val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val jsonDF = sqlContext.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
.option("compression", "none")
.save(jsonDir)

val compressedFiles = new File(jsonDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val jsonCopy = sqlContext.read
.format("json")
.load(jsonDir)

assert(jsonCopy.count == jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
checkAnswer(jsonCopySome, jsonDFSome)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("Casting long as timestamp") {
withTempTable("jsonTable") {
val schema = (new StructType).add("ts", TimestampType)
Expand Down
Loading