-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4195][Core]retry to fetch blocks's result when fetchfailed's reason is connection timeout #3061
Conversation
Test build #22761 has started for PR 3061 at commit
|
Test build #22761 has finished for PR 3061 at commit
|
Test FAILed. |
@aarondav we should incorporate this into the new transport thing |
@@ -39,6 +41,10 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa | |||
|
|||
private var blockDataManager: BlockDataManager = _ | |||
|
|||
private val blockFailedCounts = new HashMap[Seq[String], Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid memory leaks, we need to be sure that this won't grow without bound. Let me try to walk through the cases...
- If no errors occur, this will remain empty since entries are only added on error.
- If a fetch fails and a retry succeeds, then the entry is removed from this map.
- If the maximum number of attempts is exceeded, we don't remove an entry from this map.
So, looks like this adds a memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, thank you for finding its error.
There are a bunch of minor style issues here; I don't want to comment on them individually, so please take a look at https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide. |
blockIds.foreach { blockId => | ||
listener.onBlockFetchFailure(blockId, exception) | ||
exception match { | ||
case connectExcpt: IOException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not catch ConnectException
? I suppose it's always safe to retry as long as the number of retries is bounded, but it's probably better to catch the narrower exception if we're only trying to deal with connection establishment errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala#L963 doesnot catch ConnectException and throw a IOException. so now in here we only catch IOException. if we are only trying to deal with connection errors, it need to catch ConnectException in ConnectionManager.scala.
Test build #24757 has started for PR 3061 at commit
|
Test build #24757 has finished for PR 3061 at commit
|
Test FAILed. |
Actually let's close this one. Based on Tencent's feedback, we have already implemented the same functionality in Netty. I'm not sure whether it is worth it to fix the current connection manager that very few people understand. |
OK. I will close this PR. |
when there are many executors in a application(example:1000),Connection timeout often occure.
Exception is:
WARN nio.SendingConnection: Error finishing connection
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:342)
at org.apache.spark.network.nio.ConnectionManager$$anon$11.run(ConnectionManager.scala:273)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
that will make driver as these executors are lost, but in fact these executors are alive. so add retry mechanism to reduce the probability of the occurrence of this problem. @rxin