-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-2196: Support LZ4_RAW codec #1000
Conversation
@pitrou @shangxinli Can you please take a look? Thanks in advance! |
3842da3
to
e3ffe88
Compare
cc @lidavidm @emkornfield : could you give this a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac Did you try to read an actual file produced by Parquet C++?
Note you can find such files in https://github.com/apache/parquet-testing/
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
Yes, I have tried that. I will add some parquet files for compatibility test as well. |
Thank Gang for contributing! Is there any benchmarking numbers? Any comparison with ZSTD? These are non-blocking questions for review and merging. |
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
Outdated
Show resolved
Hide resolved
Nice implementation! For the test, can you add more for interop with lz4? |
@shangxinli Thanks for the review. I haven't be able to run the benchmark since the Hadoop Lz4Codec is implemented via JNI and I cannot simply run it on my Mac M1 laptop. Generally this is the same LZ4 algorithm with the one shipped by Hadoop. The key difference is that LZ4_RAW has removed the redundant block header introduced by Hadoop Lz4Codec and its implementation is in pure Java. I will add the interop test in a follow-up commit. |
The interop test has been added. Please take a look again. Thanks! @shangxinli @pitrou |
parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
Outdated
Show resolved
Hide resolved
It would probably be easier to use a git submodule (I wonder why that's not the approach being used in |
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestLz4RawCodec.java
Outdated
Show resolved
Hide resolved
What was the source of the instability? |
I have changed the approach of interop test to follow encryption which downloads test files from parquet-testing repo and verifies the data decompressed and decoded from them. In addition, the codec test has been modified to be generic which supports LZ4_RAW and SNAPPY. It should be easy to support more codec types. Please take a look when you have time and let me know if there is any feedback, thanks! @pitrou @emkornfield @ggershinsky @shangxinli |
LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"), | ||
ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd"); | ||
ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd"), | ||
LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", CompressionCodec.LZ4_RAW, ".lz4raw"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both file extensions for LZ4 and LZ4RAW should be removed.
The reason is that the lz4
command-line utilities expect the LZ4 frame format.
In any case, the .lz4
extension should certainly not be used for the "Hadoop LZ4" compression codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I searched the parquet-mr repo and found the only usage of the codec extension as below:
public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext, Mode mode)
throws IOException, InterruptedException {
final Configuration conf = getConfiguration(taskAttemptContext);
CompressionCodecName codec = getCodec(taskAttemptContext);
String extension = codec.getExtension() + ".parquet";
Path file = getDefaultWorkFile(taskAttemptContext, extension);
return getRecordWriter(conf, file, codec, mode);
}
The extension is part of the enum definition and cannot be removed. I propose to rename lz4
to lz4hadoop
to explicitly differentiate this lz4 variant from standard lz4
. @pitrou
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
@lwhite1 Could you perhaps review this? I can't really comment on the details of the Java code, myself. |
LGTM, let's see if there are comments from others, otherwise we can merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
The following are the changes needed: * PySpark 3.5 has deprecated the support for Python 3.7. This required changes to Delta test infra to install the appropriate Python version and other packages. The `Dockerfile` used for running tests is also updated to have required Python version and packages and uses the same base image as PySpark test infra in Apache Spark. * `StructType.toAttributes` and `StructType.fromAttributes` methods are moved into a utility class `DataTypeUtils`. * The `iceberg` module is disabled as there is no released version of `iceberg` that works with Spark 3.5 yet * Remove the URI path hack used in `DMLWithDeletionVectorsHelper` to get around a bug in Spark 3.4. * Remove unrelated tutorial in `delta/examples/tutorials/saiseu19` * Test failure fixes * `org.apache.spark.sql.delta.DeltaHistoryManagerSuite` - Error message has changed * `org.apache.spark.sql.delta.DeltaOptionSuite` - Parquet file name using the LZ4 code has changed due to a apache/parquet-java#1000 in `parquet-mr` dependency. * `org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite` - Parquet now generates `row-index` whenever `_metadata` column is selected, however Spark 3.5 has a bug where a row group containing more than 2bn rows fails. For now don't return any `row-index` column in `_metadata` by overriding the `metadataSchemaFields: Seq[StructField]` in `DeltaParquetFileFormat`. * `org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuerySuite`: A behavior change by apache/spark#40922. In Spark plans a new function called `ToPrettyString` is used instead of `cast(aggExpr To STRING)` in when `Dataset.show()` usage. * `org.apache.spark.sql.delta.DeltaCDCStreamDeletionVectorSuite` and `org.apache.spark.sql.delta.DeltaCDCStreamSuite`: Regression in Spark 3.5 RC fixed by apache/spark#42774 before the Spark 3.5 release Closes delta-io#1986 GitOrigin-RevId: b0e4a81b608a857e45ecba71b070309347616a30
The following are the changes needed: * PySpark 3.5 has deprecated the support for Python 3.7. This required changes to Delta test infra to install the appropriate Python version and other packages. The `Dockerfile` used for running tests is also updated to have required Python version and packages and uses the same base image as PySpark test infra in Apache Spark. * `StructType.toAttributes` and `StructType.fromAttributes` methods are moved into a utility class `DataTypeUtils`. * The `iceberg` module is disabled as there is no released version of `iceberg` that works with Spark 3.5 yet * Remove the URI path hack used in `DMLWithDeletionVectorsHelper` to get around a bug in Spark 3.4. * Remove unrelated tutorial in `delta/examples/tutorials/saiseu19` * Test failure fixes * `org.apache.spark.sql.delta.DeltaHistoryManagerSuite` - Error message has changed * `org.apache.spark.sql.delta.DeltaOptionSuite` - Parquet file name using the LZ4 code has changed due to a apache/parquet-java#1000 in `parquet-mr` dependency. * `org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite` - Parquet now generates `row-index` whenever `_metadata` column is selected, however Spark 3.5 has a bug where a row group containing more than 2bn rows fails. For now don't return any `row-index` column in `_metadata` by overriding the `metadataSchemaFields: Seq[StructField]` in `DeltaParquetFileFormat`. * `org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuerySuite`: A behavior change by apache/spark#40922. In Spark plans a new function called `ToPrettyString` is used instead of `cast(aggExpr To STRING)` in when `Dataset.show()` usage. * `org.apache.spark.sql.delta.DeltaCDCStreamDeletionVectorSuite` and `org.apache.spark.sql.delta.DeltaCDCStreamSuite`: Regression in Spark 3.5 RC fixed by apache/spark#42774 before the Spark 3.5 release Closes #1986 GitOrigin-RevId: b0e4a81b608a857e45ecba71b070309347616a30
The following are the changes needed: * PySpark 3.5 has deprecated the support for Python 3.7. This required changes to Delta test infra to install the appropriate Python version and other packages. The `Dockerfile` used for running tests is also updated to have required Python version and packages and uses the same base image as PySpark test infra in Apache Spark. * `StructType.toAttributes` and `StructType.fromAttributes` methods are moved into a utility class `DataTypeUtils`. * The `iceberg` module is disabled as there is no released version of `iceberg` that works with Spark 3.5 yet * Remove the URI path hack used in `DMLWithDeletionVectorsHelper` to get around a bug in Spark 3.4. * Remove unrelated tutorial in `delta/examples/tutorials/saiseu19` * Test failure fixes * `org.apache.spark.sql.delta.DeltaHistoryManagerSuite` - Error message has changed * `org.apache.spark.sql.delta.DeltaOptionSuite` - Parquet file name using the LZ4 code has changed due to a apache/parquet-java#1000 in `parquet-mr` dependency. * `org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite` - Parquet now generates `row-index` whenever `_metadata` column is selected, however Spark 3.5 has a bug where a row group containing more than 2bn rows fails. For now don't return any `row-index` column in `_metadata` by overriding the `metadataSchemaFields: Seq[StructField]` in `DeltaParquetFileFormat`. * `org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuerySuite`: A behavior change by apache/spark#40922. In Spark plans a new function called `ToPrettyString` is used instead of `cast(aggExpr To STRING)` in when `Dataset.show()` usage. * `org.apache.spark.sql.delta.DeltaCDCStreamDeletionVectorSuite` and `org.apache.spark.sql.delta.DeltaCDCStreamSuite`: Regression in Spark 3.5 RC fixed by apache/spark#42774 before the Spark 3.5 release Closes delta-io#1986 GitOrigin-RevId: b0e4a81b608a857e45ecba71b070309347616a30
This PR implements the LZ4_RAW codec which was introduced by parquet format v2.9.0. Since there are a lot of common logic between the LZ4_RAW and SNAPPY codecs, this patch moves them into NonBlockedCompressor and NonBlockedDecompressor and make the specific codec extend them.
Added TestLz4RawCodec test to make sure the new codec itself is correct.