Skip to content

Commit

Permalink
Convert BypassMergeSortShuffleWriter to Java
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 26, 2015
1 parent bc1a820 commit 30ef2c8
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

/**
* This class handles sort-based shuffle's `bypassMergeSort` write path, which is used for shuffles
* for which no Ordering and no Aggregator is given and the number of partitions is
* less than `spark.shuffle.sort.bypassMergeThreshold`.
*
* This path used to be part of [[ExternalSorter]] but was refactored into its own class in order to
* reduce code complexity; see SPARK-7855 for more details.
*
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
*/
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {

private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetrics writeMetrics;
private final Serializer serializer;

/** Array of file writers, one for each partition */
private BlockObjectWriter[] partitionWriters;

public BypassMergeSortShuffleWriter(
SparkConf conf,
BlockManager blockManager,
Partitioner partitioner,
ShuffleWriteMetrics writeMetrics,
Serializer serializer) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.numPartitions = partitioner.numPartitions();
this.blockManager = blockManager;
this.partitioner = partitioner;
this.writeMetrics = writeMetrics;
this.serializer = serializer;
}

@Override
public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new BlockObjectWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
}

@Override
public long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}
for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
Option<ShuffleWriteMetrics> maybeWriteMetrics = context.taskMetrics().shuffleWriteMetrics();
if (maybeWriteMetrics.isDefined()) {
maybeWriteMetrics.get().incShuffleWriteTime(System.nanoTime() - writeStartTime);
}
}
return lengths;
}

@Override
public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (BlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
logger.error("Error while deleting file for block {}", writer.blockId());
}
}
} finally {
partitionWriters = null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.sort
package org.apache.spark.shuffle.sort;

import java.io.{IOException, File}
import java.io.File;
import java.io.IOException;

import org.apache.spark.TaskContext
import org.apache.spark.storage.BlockId
import scala.Product2;
import scala.collection.Iterator;

import org.apache.spark.TaskContext;
import org.apache.spark.storage.BlockId;

/**
* Interface for objects that [[SortShuffleWriter]] uses to write its output files.
* Interface for objects that {@link SortShuffleWriter} uses to write its output files.
*/
private[spark] trait SortShuffleFileWriter[K, V] {
interface SortShuffleFileWriter<K, V> {

@throws[IOException]
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit
void insertAll(Iterator<Product2<K, V>> records) throws IOException;

/**
* Write all the data added into this shuffle sorter into a file in the disk store. This is
Expand All @@ -39,12 +42,10 @@ def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
@throws[IOException]
def writePartitionedFile(
blockId: BlockId,
context: TaskContext,
outputFile: File): Array[Long]

@throws[IOException]
def stop(): Unit
long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException;

void stop() throws IOException;
}

This file was deleted.

0 comments on commit 30ef2c8

Please sign in to comment.