Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4307] Initialize FileDescriptor lazily in FileRegion. #3172

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val blockHandler = new ExternalShuffleBlockHandler()
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
new TransportContext(transportConf, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.network.netty.SparkTransportConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order

import org.apache.spark.network.util.TransportConf

import scala.collection.JavaConversions._

import org.apache.spark.{Logging, SparkConf, SparkEnv}
Expand Down Expand Up @@ -68,6 +71,8 @@ private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
Expand Down Expand Up @@ -182,13 +187,14 @@ class FileShuffleBlockManager(conf: SparkConf)
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
if (segmentOpt.isDefined) {
val segment = segmentOpt.get
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
return new FileSegmentManagedBuffer(
segment.file, segment.offset, segment.length, transportConf)
}
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
new FileSegmentManagedBuffer(file, 0, file.length, transportConf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.google.common.io.ByteStreams

import org.apache.spark.SparkEnv
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

/**
Expand All @@ -42,6 +43,8 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(SparkEnv.get.conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
Expand Down Expand Up @@ -111,7 +114,8 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
new FileSegmentManagedBuffer(
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
nextOffset - offset,
transportConf)
} finally {
in.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
rpcHandler = new ExternalShuffleBlockHandler()
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;

/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
// TODO: Make this configurable
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

private final File file;
private final long offset;
private final long length;
private final TransportConf conf;

public FileSegmentManagedBuffer(File file, long offset, long length) {
public FileSegmentManagedBuffer(File file, long offset, long length, TransportConf conf) {
this.file = file;
this.offset = offset;
this.length = length;
this.conf = conf;
}

public FileSegmentManagedBuffer(File file, long offset, long length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment that the other constructor is preferred (this creates an extra object, for instance, and gets configuration from a random place).

this(file, offset, length, new TransportConf(new SystemPropertyConfigProvider()));
}

@Override
Expand All @@ -65,7 +65,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -134,8 +134,12 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

public File getFile() { return file; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.buffer;

import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

import com.google.common.base.Objects;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.JavaUtils;

/**
* A FileRegion implementation that only creates the file descriptor when the region is being
* transferred. This cannot be used with Epoll because there is no native support for it.
*/
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

private final File file;
private final long position;
private final long count;

private FileChannel channel;

private long numBytesTransferred = 0L;

/**
* @param file file to transfer.
* @param position start position for the transfer.
* @param count number of bytes to transfer starting from position.
*/
public LazyFileRegion(File file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}

@Override
protected void deallocate() {
JavaUtils.closeQuietly(channel);
}

@Override
public long position() {
return position;
}

@Override
public long transfered() {
return numBytesTransferred;
}

@Override
public long count() {
return count;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (channel == null) {
channel = new FileInputStream(file).getChannel();
}

long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
}

if (count == 0) {
return 0L;
}

long written = channel.transferTo(this.position + position, count, target);
if (written > 0) {
numBytesTransferred += written;
}
return written;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("position", position)
.add("count", count)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,21 @@ public int connectionTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }

/**
* Minimum size of a block that we should start using memory map rather than reading in through
* normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
* memory mapping has high overhead for blocks close to or below the page size of the OS.
*/
public int memoryMapBytes() {
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
}

/**
* Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler() {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
Expand All @@ -56,16 +58,24 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;

public ExternalShuffleBlockManager() {
private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
this(Executors.newSingleThreadExecutor());
this(Executors.newSingleThreadExecutor(), conf);
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(Executor directoryCleaner) {
ExternalShuffleBlockManager(Executor directoryCleaner, TransportConf conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please make conf first parameter to be consistent with other classes

this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
this.conf = conf;
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably don't need 2 VisibleForTesting constructors, do we? If someone wants to provide an explicit Executor, they can damn well create their own TransportConf. This is only done in a single unit test anyway.

ExternalShuffleBlockManager(Executor directoryCleaner) {
this(directoryCleaner, new TransportConf(new SystemPropertyConfigProvider()));
}

/** Registers a new Executor with all the configuration we need to find its shuffle files. */
Expand Down Expand Up @@ -167,7 +177,7 @@ private void deleteExecutorDirs(String[] dirs) {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length(), conf);
}

/**
Expand All @@ -190,7 +200,8 @@ private ManagedBuffer getSortBasedShuffleBlockData(
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
nextOffset - offset);
nextOffset - offset,
conf);
} catch (IOException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.InputStreamReader;

import com.google.common.io.CharStreams;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {

static TestShuffleDataContext dataContext;

static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());

@BeforeClass
public static void beforeAll() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);
Expand All @@ -56,7 +60,7 @@ public static void afterAll() {

@Test
public void testBadRequests() {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
// Unregistered executor
try {
manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
Expand Down Expand Up @@ -87,7 +91,7 @@ public void testBadRequests() {

@Test
public void testSortShuffleBlocks() throws IOException {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));

Expand All @@ -106,7 +110,7 @@ public void testSortShuffleBlocks() throws IOException {

@Test
public void testHashShuffleBlocks() throws IOException {
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));

Expand Down
Loading