Skip to content
This repository has been archived by the owner on Dec 20, 2022. It is now read-only.

spark rdma IBV_WC_WR_FLUSH_ERR #23

Open
xluckly opened this issue Feb 18, 2019 · 4 comments
Open

spark rdma IBV_WC_WR_FLUSH_ERR #23

xluckly opened this issue Feb 18, 2019 · 4 comments

Comments

@xluckly
Copy link

xluckly commented Feb 18, 2019

When I run Spark on yarn
Spark2.1.0
Hadoop2.7.3

My spark task is correct
But when my data is big,spark RdmaShuffleManager got error ,please check attch!

Spark rdma conf:
spark.driver.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
spark.executor.extraLibraryPath /home/bigdata/local/rdma
spark.driver.extraLibraryPath /home/bigdata/local/rdma

logs:

INFO DAGScheduler: Executor lost: 2 (epoch 29)
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, jx-bd-hadoop523, 35995, None)
19/02/18 10:18:10 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
19/02/18 10:18:10 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 29)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 5 is now unavailable on executor 2 (0/1, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 9 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 2 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 4 is now unavailable on executor 2 (0/2, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 1 is now unavailable on executor 2 (0/500, false)
19/02/18 10:18:12 WARN TransportChannelHandler: Exception in connection from /10.200.20.213:45426
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2.
19/02/18 10:18:12 ERROR YarnScheduler: Lost an executor 2 (already removed): Pending loss reason.
19/02/18 10:18:12 INFO BlockManagerMaster: Removal of executor 2 requested
19/02/18 10:18:12 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 2

19/02/18 10:17:42 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 39.9 KB, free 7.0 GB)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.200.20.213:41323)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 0 of size 8192 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 0 took 4 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 19.000389 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 13.038783 ms
19/02/18 10:17:42 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 10.63309 ms
19/02/18 10:17:42 INFO DiskBlockManager: Shutdown hook called
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 12, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.200.20.213:41323)
19/02/18 10:17:42 INFO ShutdownHookManager: Shutdown hook called
19/02/18 10:17:42 ERROR Executor: Exception in task 14.0 in stage 9.0 (TID 1518)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 12.0 in stage 9.0 (TID 1517)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 8.0 in stage 9.0 (TID 1513)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 12 of size 16384 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 12 took 0 ms
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1520
19/02/18 10:17:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 1520)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1521
19/02/18 10:17:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 1521)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1522
19/02/18 10:17:42 INFO Executor: Running task 3.0 in stage 3.0 (TID 1522)
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 23.898935 ms

19/02/18 10:44:51 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on jx-bd-hadoop528:41859 (size: 3.8 KB, free: 7.3 GB)
19/02/18 10:44:51 INFO disni: createCompChannel, context 140379227583536
19/02/18 10:44:51 INFO disni: createCQ, objId 140369536012256, ncqe 4352
19/02/18 10:44:51 INFO disni: createQP, objId 140369536014024, send_wr size 4096, recv_wr_size 256
19/02/18 10:44:51 INFO disni: accept, id 0
19/02/18 10:44:51 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.200.20.218:45240
19/02/18 10:44:52 WARN TaskSetManager: Lost task 0.0 in stage 5.2 (TID 8, jx-bd-hadoop528 executor 37): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

)
19/02/18 10:44:52 INFO TaskSetManager: Task 0.0 in stage 5.2 (TID 8) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
19/02/18 10:44:52 INFO DAGScheduler: Marking ResultStage 5 (processCmd at CliDriver.java:376) as failed due to a fetch failure from ShuffleMapStage 4 (processCmd at CliDriver.java:376)
19/02/18 10:44:52 INFO YarnScheduler: Removed TaskSet 5.2, whose tasks have all completed, from pool
19/02/18 10:44:52 INFO DAGScheduler: ResultStage 5 (processCmd at CliDriver.java:376) failed in 1.026 s due to org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528.zeus, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

INFO TaskSetManager: Task 134.0 in stage 13.0 (TID 2417) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
WARN TaskSetManager: Lost task 33.0 in stage 13.0 (TID 2333, jx-bd-hadoop526, executor 26): FetchFailed(BlockManagerId(48, jx-bd-hadoop530, 42158, None),
shuffleId=14, mapId=0, reduceId=33, message=
org.apache.spark.shuffle.FetchFailedException: RDMA Send/Write/Read WR completed with error: IBV_WC_WR_FLUSH_ERR

@xluckly xluckly closed this as completed Feb 18, 2019
@xluckly xluckly reopened this Feb 18, 2019
@petro-rudenko
Copy link
Member

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

@xluckly
Copy link
Author

xluckly commented Feb 18, 2019

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

cpus 10
RAM 192G
10 nodes
Network bandwidth 20G

@xluckly
Copy link
Author

xluckly commented Feb 20, 2019

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

use this spark.shuffle.rdma.useOdp true the problem still exists

@petro-rudenko
Copy link
Member

Do you have yarn logs for executors?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants