Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10949] Update Snappy version to 1.1.2 #8995

Closed
wants to merge 5 commits into from
Closed

[SPARK-10949] Update Snappy version to 1.1.2 #8995

wants to merge 5 commits into from

Conversation

a-roberts
Copy link
Contributor

Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test.

Snappy 1.1.2 changelog mentions:
snappy-java-1.1.2 (22 September 2015)
This is a backward compatible release for 1.1.x.
Add AIX (32-bit) support.
There is no upgrade for the native libraries of the other platforms.

A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s)
snappy-java-1.1.2-RC2 (18 May 2015)
Fix #107: SnappyOutputStream.close() is not idempotent
snappy-java-1.1.2-RC1 (13 May 2015)
SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream
There has been no compressed format change since 1.0.5.x. So You can read the compressed results interchangeablly between these versions.
Fixes a problem when java.io.tmpdir does not exist.

From https://github.com/xerial/snappy-java/blob/develop/Milestone.md and up to date at the time of this pull request

Also note xerial/snappy-java#103
"@xerial not sure how feasible or likely it is for this to happen, but it'd help tremendously Spark's performance because we are experimenting with a new shuffle path that uses channel.transferTo to avoid user space copying. However, for that to work, we'd need the underlying format to support concatenation. As far we know, LZF has this property, and Snappy might also have it (but snappy-java implementation doesn't support it)."

Would be useful to have this in both the 1.5 and 1.6 branches

@JoshRosen
Copy link
Contributor

Snappy upgrades have historically been a cause of bugs in the past, so I'm going to veto putting this into 1.5.2. Let's definitely consider it for Spark 1.6, though.

Jenkins, this is ok to test.

@JoshRosen
Copy link
Contributor

By the way, in addition to the changes here we need to update code elsewhere in order to benefit from the concatenation of serialized streams. For the Tungsten shuffle write path, the right line to change is

!compressionEnabled || compressionCodec instanceof LZFCompressionCodec;

Rather than changing this here, though, I'd prefer to do something similar to what I did for Serializer, defining a private API to let instances express whether they have this fast-merging property:

private[spark] def supportsRelocationOfSerializedObjects: Boolean = false

@SparkQA
Copy link

SparkQA commented Oct 6, 2015

Test build #1845 has finished for PR 8995 at commit 352bb3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Any update here? Will you have time to address my comments?

@a-roberts
Copy link
Contributor Author

Josh, apologies for the late response here and making sure I'm understanding your suggestion

So for your first comment the naive way would to be add SnappyCompressionCodec such that:

final boolean fastMergeIsSupported =
      !compressionEnabled || compressionCodec instanceof LZFCompressionCodec || compressionCodec instanceof SnappyCompressionCodec;

but this isn't scalable in the long term (will be prone to error if we support more codecs and feels hacky - requiring this to be modified over time based on new functionality or new codecs being available). Having said that, we already have hard coded compression codec names in CompressionCodec.scala...

With your proposal we'd add

@Private
private[spark] def supportsSerializedStreams: Boolean = false 

Alternatively a required method named "supportsSerializedStreams" - so when a user defines their own codec to be used with Spark, this would be required.

@JoshRosen
Copy link
Contributor

Hey @a-roberts,

How about this:

  • Add a private[spark] method to the private[spark] CompressionCodec companion object and have that method maintain the hardcoded list of compression codecs which support concatenation of serialized streams. This method should accept a CompressionCodec instance and perform the instanceof check. I'd consider naming this something like "supportsConcatenationOfSerializedStreams" to be very explicit and clear.
  • Update fastMergeIsSupported to use this new static method.

I like this approach since it makes it very clear why we're only supporting those two codecs.

I wouldn't worry about third-party / external compression codecs being able to take advantage of this feature.

@a-roberts
Copy link
Contributor Author

Cheers Josh, makes sense, I'm going to test this on our systems before updating the PR with the changes, here's what I've added (the last parts of each code block).

In core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

private[spark] object CompressionCodec {

  private val configKey = "spark.io.compression.codec"
  private val shortCompressionCodecNames = Map(
    "lz4" -> classOf[LZ4CompressionCodec].getName,
    "lzf" -> classOf[LZFCompressionCodec].getName,
    "snappy" -> classOf[SnappyCompressionCodec].getName)


  private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
    codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
  }

In core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

private long[] mergeSpills(SpillInfo[] spills) throws IOException {
    final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
    final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
    final boolean fastMergeEnabled =
      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);

    final boolean fastMergeIsSupported = !compressionEnabled || 
      CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);

@JoshRosen
Copy link
Contributor

That plan sounds fine to me.

Add known compression codecs that support concatenation of serialized streams
Update fastMergeIsSupported so we can support concatenation of serialized streams
@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 17, 2015

Test build #43890 has finished for PR 8995 at commit 3d650c8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Scalastyle fixes for whitespace
@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 18, 2015

Test build #43899 has finished for PR 8995 at commit 1949bcb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@JoshRosen
Copy link
Contributor

(I'll try to see if I can get Jenkins to just auto-retest this...)

@SparkQA
Copy link

SparkQA commented Oct 20, 2015

Test build #43944 has finished for PR 8995 at commit 0f87052.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 20, 2015

Test build #43992 has finished for PR 8995 at commit 0f87052.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Hey @a-roberts, any chance that you could fix the merge conflicts here so that I can re-test? I'd like to get this patch in soon so that users can benefit from the faster shuffle spill merging out of the box.

@srowen
Copy link
Member

srowen commented Oct 27, 2015

@a-roberts are you still working on this?

@JoshRosen
Copy link
Contributor

I'd really like to get this in; @a-roberts, could you let us know if you no longer plan to work on this so that someone else could take over?

@a-roberts
Copy link
Contributor Author

Hi, have been preparing for and enjoying Spark Summit Europe, better for somebody else to take it over; looks like the files I changed have moved around since the testing so I imagine it won't take a while anyway

@JoshRosen
Copy link
Contributor

I've opened #9439 to take this over. @a-roberts, do you mind closing this one for now?

@asfgit asfgit closed this in 701fb50 Nov 4, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants