Skip to content
This repository has been archived by the owner on Dec 20, 2022. It is now read-only.

Steam is corrupted when shuffle read with RDMA Shuffle Manager #34

Open
tobegit3hub opened this issue Jul 8, 2019 · 8 comments
Open

Comments

@tobegit3hub
Copy link

tobegit3hub commented Jul 8, 2019

We have setup RDMA environment and run the Spark jobs with RDMA Shuffle Manager. Here is the Spark command to submit the job.

${SPARK_HOME}/bin/spark-submit \
        --executor-memory 7200m \
        --master yarn \
        --num-executors 40 \
        --files /root/spark-benchmark/tools/SparkRDMA/libdisni.so,/root/spark-benchmark/tools/SparkRDMA/spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --executor-cores 1 \
        --deploy-mode cluster \
        --driver-memory 4g \
        --conf spark.eventLog.dir=hdfs:///spark_benchmark/event_log/2.3.0 \
        --conf spark.yarn.jars=hdfs:///spark_benchmark/dependency/spark230_jars/*.jar \
        --conf spark.shuffle.spill.compress=false \
        --conf spark.executor.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager \
        --conf spark.driver.extraLibraryPath=./ \
        --conf spark.hadoop.yarn.timeline-service.enabled=false \
        --conf spark.eventLog.enabled=true \
        --conf spark.driver.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.executor.extraLibraryPath=./ \
        --conf spark.executor.memoryOverhead=128g \
        --conf spark.shuffle.compress=true \
        --conf spark.sql.shuffle.partitions=340 \
        --conf spark.task.maxFailures=1 \
        --conf spark.yarn.maxAppAttempts=1 \
        --class com.example.SparkApp

This script may work at times. But sometimes it may cause the fail task although the task may re-schedule and complete finally. Here is the error log of the fail task. Since we have set spark.shuffle.compress=true and the default compressor Lz4Codec will throw exception of Stream is corrupted.

org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:68)
	at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:64)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
	... 8 more

If we set spark.shuffle.compress=false, the error will be throw by readFully when try to unserialized the steam to row object.

Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 223, lnode6.leap.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 723048 bytes; 825504310 bytes expected
	at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
	... 8 more

The SparkApp is simple. We read the data from HDFS and do some transformation before calling saveAsTextFile to save the data in HDFS. The fail task always happen in the final stage.

By the way, this issue may be not easy to reproduce. But in our environment, we have use the custom codec to save the RDD which may raise the probability about this issue. The codec can not get the stream or break the stream before RDMA Shuffle Manager doing the shuffle read, so we still have no clue about the root cause of this issue.

@petro-rudenko
Copy link
Member

This seems to be an issue with LZ4 codec:
https://issues.apache.org/jira/browse/SPARK-18105

Can you please try to set ( lzf OR snappy OR zstd) to spark.io.compression.code

@tobegit3hub
Copy link
Author

Thanks @petro-rudenko . We have ty lz4 codec in this scenario and it works with SparkRDMA.

@tobegit3hub
Copy link
Author

By the way, we got another error after shuffle read when compression was disable. So it is definitely not the issue of the compression implementation.

@petro-rudenko
Copy link
Member

What's the error you got?

@tobegit3hub
Copy link
Author

tobegit3hub commented Jul 11, 2019

@petro-rudenko The error message for disable compression was in the description of this issue. The third code snippet of #34 (comment) .

@tobegit3hub
Copy link
Author

We have communicated with the exports of FPGA and they said that the initialization of each FPGA process will request and reset some memory which may affect the data of shuffle read. We have seen that the data was corrupt with many bytes were 0 which looks like a fresh memory.

I'm not sure if it is related yet. But the RDMA Shuffle Manager may check or print the data of shuffle read. If it is not corrupted in the internal of RDMA Shuffle Manager but corrupted in the Spark executor's memory which may be initialized by FPGA code, it could be the issue of the client-size memory management.

We have seem the official Spark shuffle manager has the steam check for each shuffle, is it possible to add the similiar code in RDMA shuffle manager? @petro-rudenko

@petro-rudenko
Copy link
Member

petro-rudenko commented Jul 17, 2019

@tobegit3hub yes, i saw this feature. We're in the process of migrating SparkRDMA to more performant network backend based on ucx library. There we woudn't reimplement default spark iterator, rather all logic would be in the ShuffleClient. I'll check how easy would be to add corruption checker here. cc @yosefe

@tobegit3hub
Copy link
Author

Thanks @petro-rudenko . That may help and we can test for the new client in our environment.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants