Skip to content

Commit

Permalink
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Browse files Browse the repository at this point in the history
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time.

Author: Reynold Xin <rxin@databricks.com>
Author: Reynold Xin <rxin@apache.org>

Closes #3172 from rxin/lazyFD and squashes the following commits:

0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion
d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager.
6ed369e [Reynold Xin] Code review feedback.
04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.

(cherry picked from commit ef29a9a)
Signed-off-by: Aaron Davidson <aaron@databricks.com>
  • Loading branch information
rxin authored and aarondav committed Nov 11, 2014
1 parent df8242c commit e9d009d
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val blockHandler = new ExternalShuffleBlockHandler()
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
new TransportContext(transportConf, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.storage._
Expand Down Expand Up @@ -68,6 +69,8 @@ private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
Expand Down Expand Up @@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
if (segmentOpt.isDefined) {
val segment = segmentOpt.get
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
return new FileSegmentManagedBuffer(
transportConf, segment.file, segment.offset, segment.length)
}
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

/**
Expand All @@ -38,10 +39,12 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager extends ShuffleBlockManager {
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
Expand Down Expand Up @@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

private val indexShuffleBlockManager = new IndexShuffleBlockManager()
private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
rpcHandler = new ExternalShuffleBlockHandler()
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,19 @@

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportConf;

/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
// TODO: Make this configurable
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

private final TransportConf conf;
private final File file;
private final long offset;
private final long length;

public FileSegmentManagedBuffer(File file, long offset, long length) {
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
Expand All @@ -65,7 +60,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -134,8 +129,12 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

public File getFile() { return file; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.buffer;

import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

import com.google.common.base.Objects;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.JavaUtils;

/**
* A FileRegion implementation that only creates the file descriptor when the region is being
* transferred. This cannot be used with Epoll because there is no native support for it.
*
* This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
* should push this into Netty so the native Epoll transport can support this feature.
*/
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

private final File file;
private final long position;
private final long count;

private FileChannel channel;

private long numBytesTransferred = 0L;

/**
* @param file file to transfer.
* @param position start position for the transfer.
* @param count number of bytes to transfer starting from position.
*/
public LazyFileRegion(File file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}

@Override
protected void deallocate() {
JavaUtils.closeQuietly(channel);
}

@Override
public long position() {
return position;
}

@Override
public long transfered() {
return numBytesTransferred;
}

@Override
public long count() {
return count;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (channel == null) {
channel = new FileInputStream(file).getChannel();
}

long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
}

if (count == 0) {
return 0L;
}

long written = channel.transferTo(this.position + position, count, target);
if (written > 0) {
numBytesTransferred += written;
}
return written;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("position", position)
.add("count", count)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,21 @@ public int connectionTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }

/**
* Minimum size of a block that we should start using memory map rather than reading in through
* normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
* memory mapping has high overhead for blocks close to or below the page size of the OS.
*/
public int memoryMapBytes() {
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
}

/**
* Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
static ManagedBuffer bufferChunk;
static ManagedBuffer fileChunk;

private TransportConf transportConf;

@BeforeClass
public static void setUp() throws Exception {
int bufSize = 100000;
Expand All @@ -80,17 +82,18 @@ public static void setUp() throws Exception {
new Random().nextBytes(fileContent);
fp.write(fileContent);
fp.close();
fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);

TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);

streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
assertEquals(STREAM_ID, streamId);
if (chunkIndex == BUFFER_CHUNK_INDEX) {
return new NioManagedBuffer(buf);
} else if (chunkIndex == FILE_CHUNK_INDEX) {
return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
} else {
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler() {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
Expand All @@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;

public ExternalShuffleBlockManager() {
private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
this(Executors.newSingleThreadExecutor());
this(conf, Executors.newSingleThreadExecutor());
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(Executor directoryCleaner) {
ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
}
Expand Down Expand Up @@ -167,7 +171,7 @@ private void deleteExecutorDirs(String[] dirs) {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
}

/**
Expand All @@ -187,6 +191,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
long offset = in.readLong();
long nextOffset = in.readLong();
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
Expand Down
Loading

0 comments on commit e9d009d

Please sign in to comment.