Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3019] Pluggable block transfer interface (BlockTransferService) #2240

Closed
wants to merge 13 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Sep 2, 2014

This pull request creates a new BlockTransferService interface for block fetch/upload and refactors the existing ConnectionManager to implement BlockTransferService (NioBlockTransferService).

Most of the changes are simply moving code around. The main class to inspect is ShuffleBlockFetcherIterator.

Review guide:

  • Most of the ConnectionManager code is now in network.cm package
  • ManagedBuffer is a new buffer abstraction backed by several different implementations (file segment, nio ByteBuffer, Netty ByteBuf)
  • BlockTransferService is the main internal interface introduced in this PR
  • NioBlockTransferService implements BlockTransferService and replaces the old BlockManagerWorker
  • ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the new interface

TODOs that should be separate PRs:

  • Implement NettyBlockTransferService
  • Finalize the API/semantics for ManagedBuffer.release()

…rvice

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
	core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@SparkQA
Copy link

SparkQA commented Sep 2, 2014

QA tests have started for PR 2240 at commit 07ccf0d.

  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Sep 2, 2014

QA tests have finished for PR 2240 at commit 07ccf0d.

  • This patch fails unit tests.
  • This patch does not merge cleanly!

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
	core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
	core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
	core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2240 at commit 2a907e4.

  • This patch merges cleanly.

@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

@colorant can you take a look at this since it interfaces with some of your changes?

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2240 at commit 2a907e4.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override def size: Long = length

override def byteBuffer(): ByteBuffer = {
val channel = new RandomAccessFile(file, "r").getChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the minMemoryMapBytes judgement here for directly read small files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is conf propagation. I will add a todo to enable that.

@shivaram
Copy link
Contributor

shivaram commented Sep 3, 2014

I took a very brief look -- Will take a closer look when I find some more time. But here are a couple of high-level comments

  1. It would be better IMHO to split this into 2 PRs, one just for renaming / moving classes and the other for adding new stuff. That would make reviewing much easier (for example I don't know if any code changed in ShuffleBlockFetchIterator right now)
  2. I am not a big fan of the package name cm. I know connectionmanager is a mouthful, but my suggestion would actually be to keep everything in network till we figure out how to split things into sub-packages. I think ideally we'd like common traits / abstract classes to be in network/ and then have netty and nio etc. as sub-packages (the connection manager uses nio right ?)


initialize()

private[this] def sendRequest(req: FetchRequest) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to reviewers: this is the function that changed.

1. Rename package name from cm to nio.

2. Refined BlockTransferService and ManagedBuffer interfaces.
@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

I talked to @shivaram offline. It is not really going to simplify review if I break this into two PRs because the renaming part is very small and has only about ~50 loc changes.

The main thing to review is the high level interface (which we can change later also since this is an internal interface), and the sendRequest function in ShuffleBlockFetcherIterator.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2240 at commit e29c721.

  • This patch merges cleanly.

throw new IllegalStateException(getClass.getName + " has not been initialized")
}

private def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to reviewer: everything after this line is copied from old BlockManagerWorker code

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2240 at commit e29c721.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

)
}

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to reviewer: everything after this line in this file is copied from the old code. no need to review

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2240 at commit 2960c93.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2240 at commit 2960c93.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2240 at commit 1332156.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2240 at commit 1332156.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@rxin
Copy link
Contributor Author

rxin commented Sep 8, 2014

Jenkins, test this please.

Some(segment.nioByteBuffer())
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: This is changed to return a ManagedBuffer, which can return an input stream directly from the underlying file rather than copying the entire underlying content to a bytebuffer and create an input stream from that bytebuffer.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have started for PR 2240 at commit 64cd9d7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have finished for PR 2240 at commit 64cd9d7.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Sep 8, 2014

Ok I talked to @pwendell and @shivaram offline. I'm going to merge this for now since the pull request is growing larger. Feel free to continue commenting on the pull request, and I will address these comments as separte pull requests.

@asfgit asfgit closed this in 08ce188 Sep 8, 2014
aarondav added a commit to aarondav/spark that referenced this pull request Oct 10, 2014
This PR encapsulates apache#2330, which is itself a continuation of apache#2240. The first goal of this
PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.

In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a
standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or
on its own. This PR makes the first step in this direction by ensuring that the actual Netty service
is as small as possible and extracted from Spark core. Given this, we should be able to construct
this standalone jar which can be included in other JVMs without incurring significant dependency or
runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark
will be left for a future PR, however.

In order to minimize dependencies and allow for the service to be long-running (possibly
much longer-running than Spark, and possibly having to support multiple version of Spark
simultaneously), the entire service has been ported to Java, where we have full control
over the binary compatibility of the components and do not depend on the Scala runtime or
version.

These PRs have been addressed by folding in apache#2330:

SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator

TODO before mergeable:
[ ] Implement uploadBlock()
[ ] Unit tests for RPC side of code
[ ] Performance testing
aarondav added a commit to aarondav/spark that referenced this pull request Oct 10, 2014
This PR encapsulates apache#2330, which is itself a continuation of apache#2240. The first goal of this
PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.

In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a
standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or
on its own. This PR makes the first step in this direction by ensuring that the actual Netty service
is as small as possible and extracted from Spark core. Given this, we should be able to construct
this standalone jar which can be included in other JVMs without incurring significant dependency or
runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark
will be left for a future PR, however.

In order to minimize dependencies and allow for the service to be long-running (possibly
much longer-running than Spark, and possibly having to support multiple version of Spark
simultaneously), the entire service has been ported to Java, where we have full control
over the binary compatibility of the components and do not depend on the Scala runtime or
version.

These PRs have been addressed by folding in apache#2330:

SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator

TODO before mergeable:
[ ] Implement uploadBlock()
[ ] Unit tests for RPC side of code
[ ] Performance testing
[ ] Turn OFF by default (currently on for unit testing)
asfgit pushed a commit that referenced this pull request Oct 29, 2014
This PR encapsulates #2330, which is itself a continuation of #2240. The first goal of this PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.

In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or on its own. This PR makes the first step in this direction by ensuring that the actual Netty service is as small as possible and extracted from Spark core. Given this, we should be able to construct this standalone jar which can be included in other JVMs without incurring significant dependency or runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark will be left for a future PR, however.

In order to minimize dependencies and allow for the service to be long-running (possibly much longer-running than Spark, and possibly having to support multiple version of Spark simultaneously), the entire service has been ported to Java, where we have full control over the binary compatibility of the components and do not depend on the Scala runtime or version.

These issues: have been addressed by folding in #2330:

SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator

TODO before mergeable:
- [x] Implement uploadBlock()
- [x] Unit tests for RPC side of code
- [x] Performance testing (see comments [here](#2753 (comment)))
- [x] Turn OFF by default (currently on for unit testing)

Author: Reynold Xin <rxin@apache.org>
Author: Aaron Davidson <aaron@databricks.com>
Author: cocoatomo <cocoatomo77@gmail.com>
Author: Patrick Wendell <pwendell@gmail.com>
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Davies Liu <davies.liu@gmail.com>
Author: Anand Avati <avati@redhat.com>

Closes #2753 from aarondav/netty and squashes the following commits:

cadfd28 [Aaron Davidson] Turn netty off by default
d7be11b [Aaron Davidson] Turn netty on by default
4a204b8 [Aaron Davidson] Fail block fetches if client connection fails
2b0d1c0 [Aaron Davidson] 100ch
0c5bca2 [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty
14e37f7 [Aaron Davidson] Address Reynold's comments
8dfcceb [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty
322dfc1 [Aaron Davidson] Address Reynold's comments, including major rename
e5675a4 [Aaron Davidson] Fail outstanding RPCs as well
ccd4959 [Aaron Davidson] Don't throw exception if client immediately fails
9da0bc1 [Aaron Davidson] Add RPC unit tests
d236dfd [Aaron Davidson] Remove no-op serializer :)
7b7a26c [Aaron Davidson] Fix Nio compile issue
dd420fd [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty-test
939f276 [Aaron Davidson] Attempt to make comm. bidirectional
aa58f67 [cocoatomo] [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings
8dc1ded [cocoatomo] [SPARK-3867][PySpark] ./python/run-tests failed when it run with Python 2.6 and unittest2 is not installed
5b5dbe6 [Prashant Sharma] [SPARK-2924] Required by scala 2.11, only one fun/ctor amongst overriden alternatives, can have default argument(s).
2c5d9dc [Patrick Wendell] HOTFIX: Fix build issue with Akka 2.3.4 upgrade.
020691e [Davies Liu] [SPARK-3886] [PySpark] use AutoBatchedSerializer by default
ae4083a [Anand Avati] [SPARK-2805] Upgrade Akka to 2.3.4
29c6dcf [Aaron Davidson] [SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
f7e7568 [Reynold Xin] Fixed spark.shuffle.io.receiveBuffer setting.
5d98ce3 [Reynold Xin] Flip buffer.
f6c220d [Reynold Xin] Merge with latest master.
407e59a [Reynold Xin] Fix style violation.
a0518c7 [Reynold Xin] Implemented block uploads.
4b18db2 [Reynold Xin] Copy the buffer in fetchBlockSync.
bec4ea2 [Reynold Xin] Removed OIO and added num threads settings.
1bdd7ee [Reynold Xin] Fixed tests.
d68f328 [Reynold Xin] Logging close() in case close() fails.
f63fb4c [Reynold Xin] Add more debug message.
6afc435 [Reynold Xin] Added logging.
c066309 [Reynold Xin] Implement java.io.Closeable interface.
519d64d [Reynold Xin] Mark private package visibility and MimaExcludes.
f0a16e9 [Reynold Xin] Fixed test hanging.
14323a5 [Reynold Xin] Removed BlockManager.getLocalShuffleFromDisk.
b2f3281 [Reynold Xin] Added connection pooling.
d23ed7b [Reynold Xin] Incorporated feedback from Norman: - use same pool for boss and worker - remove ioratio - disable caching of byte buf allocator - childoption sendbuf/receivebuf - fire exception through pipeline
9e0cb87 [Reynold Xin] Fixed BlockClientHandlerSuite
5cd33d7 [Reynold Xin] Fixed style violation.
cb589ec [Reynold Xin] Added more test cases covering cleanup when fault happens in ShuffleBlockFetcherIteratorSuite
1be4e8e [Reynold Xin] Shorten NioManagedBuffer and NettyManagedBuffer class names.
108c9ed [Reynold Xin] Forgot to add TestSerializer to the commit list.
b5c8d1f [Reynold Xin] Fixed ShuffleBlockFetcherIteratorSuite.
064747b [Reynold Xin] Reference count buffers and clean them up properly.
2b44cf1 [Reynold Xin] Added more documentation.
1760d32 [Reynold Xin] Use Epoll.isAvailable in BlockServer as well.
165eab1 [Reynold Xin] [SPARK-3453] Refactor Netty module to use BlockTransferService.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants