Skip to content

Commit

Permalink
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Nov 9, 2014
1 parent 4af5c7e commit 04cddc8
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val blockHandler = new ExternalShuffleBlockHandler()
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
new TransportContext(transportConf, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.util.TransportConf

import scala.collection.JavaConversions._

import org.apache.spark.{Logging, SparkConf, SparkEnv}
Expand Down Expand Up @@ -68,6 +71,8 @@ private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
Expand Down Expand Up @@ -182,13 +187,14 @@ class FileShuffleBlockManager(conf: SparkConf)
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
if (segmentOpt.isDefined) {
val segment = segmentOpt.get
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
return new FileSegmentManagedBuffer(
segment.file, segment.offset, segment.length, transportConf)
}
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
new FileSegmentManagedBuffer(file, 0, file.length, transportConf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.google.common.io.ByteStreams

import org.apache.spark.SparkEnv
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

/**
Expand All @@ -42,6 +43,8 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(SparkEnv.get.conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
Expand Down Expand Up @@ -111,7 +114,8 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
new FileSegmentManagedBuffer(
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
nextOffset - offset,
transportConf)
} finally {
in.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
rpcHandler = new ExternalShuffleBlockHandler()
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;

/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
// TODO: Make this configurable
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

private final File file;
private final long offset;
private final long length;
private final TransportConf conf;

public FileSegmentManagedBuffer(File file, long offset, long length) {
public FileSegmentManagedBuffer(File file, long offset, long length, TransportConf conf) {
this.file = file;
this.offset = offset;
this.length = length;
this.conf = conf;
}

public FileSegmentManagedBuffer(File file, long offset, long length) {
this(file, offset, length, new TransportConf(new SystemPropertyConfigProvider()));
}

@Override
Expand All @@ -65,7 +65,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -134,8 +134,12 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

public File getFile() { return file; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.buffer;

import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

import com.google.common.base.Objects;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.JavaUtils;

/**
* A FileRegion implementation that only creates the file descriptor when the region is being
* transferred. This cannot be used with Epoll because there is no native support for it.
*/
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

private final File file;
private final long position;
private final long count;

private FileChannel channel;

private long numBytesTransferred = 0L;

/**
* @param file file to transfer.
* @param position start position for the transfer.
* @param count number of bytes to transfer starting from position.
*/
public LazyFileRegion(File file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}

@Override
protected void deallocate() {
JavaUtils.closeQuietly(channel);
}

@Override
public long position() {
return position;
}

@Override
public long transfered() {
return numBytesTransferred;
}

@Override
public long count() {
return count;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (channel == null) {
channel = new FileInputStream(file).getChannel();
}

long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
}

if (count == 0) {
return 0L;
}

long written = channel.transferTo(this.position + position, count, target);
if (written > 0) {
numBytesTransferred += written;
}
return written;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("position", position)
.add("count", count)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,21 @@ public int connectionTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }

/**
* Minimum size of a block that we should start using memory map rather than reading in through
* normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
* memory mapping has high overhead for blocks close to or below the page size of the OS.
*/
public int memoryMapBytes() {
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
}

/**
* Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler() {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
Expand All @@ -56,16 +58,24 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;

public ExternalShuffleBlockManager() {
private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
this(Executors.newSingleThreadExecutor());
this(Executors.newSingleThreadExecutor(), conf);
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(Executor directoryCleaner) {
ExternalShuffleBlockManager(Executor directoryCleaner, TransportConf conf) {
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
this.conf = conf;
}

@VisibleForTesting
ExternalShuffleBlockManager(Executor directoryCleaner) {
this(directoryCleaner, new TransportConf(new SystemPropertyConfigProvider()));
}

/** Registers a new Executor with all the configuration we need to find its shuffle files. */
Expand Down Expand Up @@ -167,7 +177,7 @@ private void deleteExecutorDirs(String[] dirs) {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length(), conf);
}

/**
Expand All @@ -190,7 +200,8 @@ private ManagedBuffer getSortBasedShuffleBlockData(
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
nextOffset - offset);
nextOffset - offset,
conf);
} catch (IOException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.InputStreamReader;

import com.google.common.io.CharStreams;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {

static TestShuffleDataContext dataContext;

static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());

@BeforeClass
public static void beforeAll() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);
Expand All @@ -56,7 +60,7 @@ public static void afterAll() {

@Test
public void testBadRequests() {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
// Unregistered executor
try {
manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
Expand Down Expand Up @@ -87,7 +91,7 @@ public void testBadRequests() {

@Test
public void testSortShuffleBlocks() throws IOException {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));

Expand All @@ -106,7 +110,7 @@ public void testSortShuffleBlocks() throws IOException {

@Test
public void testHashShuffleBlocks() throws IOException {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));

Expand Down
Loading

0 comments on commit 04cddc8

Please sign in to comment.