Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore #222

Merged
merged 65 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
26f364f
substitution: DirectByteArrayOutputstream to DirectByteBufferOutputSt…
hy00nc Apr 17, 2019
4ded359
TPC-H application codes
hy00nc Apr 25, 2019
8c557d9
TPC-H application codes
hy00nc Apr 25, 2019
3ec9c8e
TPC-H application codes
hy00nc Apr 25, 2019
1dc1dd8
beam api changed
hy00nc Apr 25, 2019
666cd01
beam tests
hy00nc Apr 25, 2019
05e7033
logging
hy00nc Apr 26, 2019
ceba288
page size experiment 32KB
hy00nc May 4, 2019
0ab5c93
initial implementation
hy00nc May 8, 2019
940bef7
getBufferList fix
hy00nc May 8, 2019
9789d7e
revert and disable clearing list for now
hy00nc May 8, 2019
34848af
tests
hy00nc May 8, 2019
d01c0f0
For writeToFile experiment...
hy00nc May 8, 2019
87409c7
no stream
hy00nc May 9, 2019
6a296a6
logging
hy00nc May 9, 2019
4d349df
use SerializedMemoryStore
hy00nc May 9, 2019
5766022
logging
hy00nc May 9, 2019
e6b271e
logging
hy00nc May 9, 2019
767c182
new edge annotation
hy00nc May 9, 2019
e40064b
Return duplicated buffer when calling getBufferList()
May 10, 2019
c75ec2c
Small fix
May 10, 2019
a1bc8d6
tests for getData()
hy00nc May 10, 2019
64b9df2
Fix toByteArray()
May 10, 2019
47c9832
Merge branch 'offHeapTest' of github.com:hy00nc/incubator-nemo into o…
May 10, 2019
8f77496
revert customized DefaultDataStorePass
hy00nc May 10, 2019
19bad77
Merge branch 'offHeapTest' of https://github.com/hy00nc/incubator-nem…
hy00nc May 10, 2019
e1be1c4
Implement zero-copy file block write
May 10, 2019
597c2f1
Merge branch 'offHeapTest' of github.com:hy00nc/incubator-nemo into o…
May 10, 2019
5339c3a
Fix type conversion error
May 10, 2019
b1b7eb0
default + serializedmemorystore annotatation
hy00nc May 10, 2019
984652b
logging
hy00nc May 11, 2019
fbb577e
ConcurrentLinkedQueue for reuse
hy00nc May 11, 2019
cb8b9e6
allocation size decrease
hy00nc May 11, 2019
930cea0
allocation size increase
hy00nc May 11, 2019
dcd571c
allocation size increase
hy00nc May 11, 2019
4214da3
logging finalize() occurrence
hy00nc May 11, 2019
c4420b6
erase logging
hy00nc May 12, 2019
e0e7c8b
erase logging
hy00nc May 12, 2019
fc69549
on-heap in decode()
hy00nc May 12, 2019
673c98d
preallocation size decrease
hy00nc May 13, 2019
e9fc080
no more toByteArray() and exception when getData() called
hy00nc May 13, 2019
c5efdcf
use DirectByteBufferInputStream in serializedPartition -> nonSerializ…
hy00nc May 13, 2019
a2713f4
initial DirectByteBufferInputStream
hy00nc May 13, 2019
7253e6f
new Constructor for SerializedPartition
hy00nc May 13, 2019
93ecf76
offheap for converting non-serialized data to serialized data
hy00nc May 13, 2019
edd1975
initial edits
hy00nc Jun 18, 2019
53faa33
erase useless codes
hy00nc Jun 18, 2019
58925d3
ByteOutputContext
hy00nc Jun 18, 2019
d56494e
comments and erases
hy00nc Jun 18, 2019
39412ad
test
hy00nc Jun 18, 2019
fa49ca3
file channel use refactoring
hy00nc Jun 18, 2019
fbf97e2
file channel fix
hy00nc Jun 18, 2019
48c6b98
checkstyle
hy00nc Jun 18, 2019
6f4dae5
getData() exception
hy00nc Jun 18, 2019
e5ef592
checkstyle
hy00nc Jun 18, 2019
d0e5478
checkstyle
hy00nc Jun 18, 2019
0cb1f1e
Merge branch 'master' into Off-Heap_PR
hy00nc Jun 18, 2019
17cd1d7
getBuffer() fixed to return the duplicated list of ByteBuffers.
hy00nc Jun 19, 2019
7b4a6bc
fixed to pass BlockStoreTest
hy00nc Jun 19, 2019
617dfd3
Merge branch 'Off-Heap_PR' of https://github.com/hy00nc/incubator-nem…
hy00nc Jun 19, 2019
1b512c4
refactoring
hy00nc Jun 21, 2019
449649d
implements autocloseable
hy00nc Jun 22, 2019
38578df
refactoring
hy00nc Jun 24, 2019
c55d54d
rename class ByteBufferInputStream
hy00nc Jun 25, 2019
bf689d2
minor comment editing
hy00nc Jun 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

/**
* This class is a customized input stream implementation which reads data from
* list of {@link ByteBuffer}. If the {@link ByteBuffer} is direct, it may reside outside
* the normal garbage-collected heap memory.
*/
public class DirectByteBufferInputStream extends InputStream {
private List<ByteBuffer> bufList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment something like:
// The contents of direct buffers may reside outside of the normal garbage-collected heap

I found the above in the ByteBuffer documentation. Is there a way to "guarantee" that the contents reside outside of the heap?

My understanding is that it can be done by using DirectByteBuffer, rather than the abstract class ByteBuffer. It'd be good to make this change, also since the name of this class is DirectByteBufferInputStream.
https://www.javacodegeeks.com/2013/08/which-memory-is-faster-heap-or-bytebuffer-or-direct.html

Copy link
Member Author

@hy00nc hy00nc Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, the name of the class implies that this is only for DirectByteBuffer. I guess it is better to change the name of the class to ByteBufferInputStream because DirectByteBuffer class is not public and it can only be constructed by calling allocateDirect() method in ByteBuffer. It is possible to check whether the ByteBuffer is direct or not, but I am afraid it has no much meaning in checking it when it is already written(in on-heap or off-heap) and we are intending to read it. Does it seem okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I am also fine with keeping the current class name DirectByteBufferInputStream, as we also have DirectByteBufferOutputStream.

private int current = 0;
private static final int BITMASK = 0xff;

/**
* Default Constructor.
*
* @param bufList is the target data to read.
*/
public DirectByteBufferInputStream(final List<ByteBuffer> bufList) {
this.bufList = bufList;
}

/**
* Reads data from the list of {@code ByteBuffer}s.
*
* @return integer.
* @throws IOException
*/
@Override
public int read() throws IOException {
// Since java's byte is signed type, we have to mask it to make byte
// become unsigned type to properly retrieve `int` from sequence of bytes.
return getBuffer().get() & BITMASK;
}

/**
* Return next non-empty @code{ByteBuffer}.
*
* @return @code{ByteBuffer} to write the data
* @throws IOException when fail to retrieve buffer.
*/
public ByteBuffer getBuffer() throws IOException {
while (current < bufList.size()) {
ByteBuffer buffer = bufList.get(current);
if (buffer.hasRemaining()) {
return buffer;
}
current += 1;
}
throw new EOFException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@
*/
package org.apache.nemo.common;

import com.google.common.annotations.VisibleForTesting;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
* This class is a customized output stream implementation backed by
* {@link ByteBuffer}, which utilizes off heap memory when writing the data.
* Memory is allocated when needed by the specified {@code pageSize}.
* Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
* when the corresponding block is deleted.
* TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
*/
public final class DirectByteBufferOutputStream extends OutputStream {
hy00nc marked this conversation as resolved.
Show resolved Hide resolved

private LinkedList<ByteBuffer> dataList = new LinkedList<>();
private static final int DEFAULT_PAGE_SIZE = 4096;
private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
private final int pageSize;
private ByteBuffer currentBuf;


/**
* Default constructor.
* Sets the {@code pageSize} as default size of 4096 bytes.
Expand All @@ -45,8 +50,9 @@ public DirectByteBufferOutputStream() {
}

/**
* Constructor specifying the {@code size}.
* Sets the {@code pageSize} as {@code size}.
* Constructor which sets {@code pageSize} as specified {@code size}.
* Note that the {@code pageSize} has trade-off between memory fragmentation and
* native memory (de)allocation overhead.
*
* @param size should be a power of 2 and greater than or equal to 4096.
*/
Expand All @@ -62,6 +68,7 @@ public DirectByteBufferOutputStream(final int size) {
/**
* Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
*/
// TODO #388: Off-heap memory management (reuse ByteBuffer)
private void newLastBuffer() {
dataList.addLast(ByteBuffer.allocateDirect(pageSize));
}
Expand Down Expand Up @@ -120,14 +127,15 @@ public void write(final byte[] b, final int off, final int len) {
}
}


/**
* Creates a byte array that contains the whole content currently written in this output stream.
* Note that this method causes array copy which could degrade performance.
* TODO #384: For performance issue, implement an input stream so that we do not have to use this method.
*
* USED BY TESTS ONLY.
* @return the current contents of this output stream, as byte array.
*/
public byte[] toByteArray() {
@VisibleForTesting
byte[] toByteArray() {
if (dataList.isEmpty()) {
final byte[] byteArray = new byte[0];
return byteArray;
Expand All @@ -140,38 +148,48 @@ public byte[] toByteArray() {
final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
final byte[] byteArray = new byte[arraySize];
int start = 0;
int byteToWrite;

for (final ByteBuffer temp : dataList) {
// ByteBuffer has to be shifted to read mode by calling ByteBuffer.flip(),
// which sets limit to the current position and sets the position to 0.
// Note that capacity remains unchanged.
temp.flip();
byteToWrite = temp.remaining();
temp.get(byteArray, start, byteToWrite);

for (final ByteBuffer buffer : dataList) {
// We use duplicated buffer to read the data so that there is complicated
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
// alteration of position and limit when switching between read and write mode.
final ByteBuffer dupBuffer = buffer.duplicate();
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
dupBuffer.flip();
final int byteToWrite = dupBuffer.remaining();
dupBuffer.get(byteArray, start, byteToWrite);
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
start += byteToWrite;
}
// The limit of the last buffer has to be set to the capacity for additional write.
lastBuf.limit(lastBuf.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed it to read from duplicated buffer so the original one maintains its limit.


return byteArray;
}

/**
* Returns the list of {@code ByteBuffer}s that contains the written data.
* Note that by calling this method, the existing list of {@code ByteBuffer}s is cleared.
* List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
* position and limit, to reduce erroneous data read/write.
* This function has to be called when intended to read from the start of the list of
* {@link ByteBuffer}s, not for additional write.
*
* @return the {@code LinkedList} of {@code ByteBuffer}s.
*/
public List<ByteBuffer> getBufferListAndClear() {
List<ByteBuffer> result = dataList;
dataList = new LinkedList<>();
for (final ByteBuffer buffer : result) {
buffer.flip();
public List<ByteBuffer> getDirectByteBufferList() {
List<ByteBuffer> result = new ArrayList<>(dataList.size());
for (final ByteBuffer buffer : dataList) {
final ByteBuffer dupBuffer = buffer.duplicate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on duplicate().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please refer to the comment above 👍

dupBuffer.flip();
result.add(dupBuffer);
}
return result;
}

/**
* Returns the size of the data written in this output stream.
*
* @return the size of the data
*/
public int size() {
return pageSize * (dataList.size() - 1) + dataList.getLast().position();
}

/**
* Closing this output stream has no effect.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.nemo.common.coder;

import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -82,7 +82,7 @@ private BytesDecoder(final InputStream inputStream) {
public byte[] decode() throws IOException {
// We cannot use inputStream.available() to know the length of bytes to read.
// The available method only returns the number of bytes can be read without blocking.
final DirectByteArrayOutputStream byteOutputStream = new DirectByteArrayOutputStream();
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
int b = inputStream.read();
while (b != -1) {
byteOutputStream.write(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ public void testLongReRead() {
}

@Test
public void testGetBufferList() {
public void testGetDirectBufferList() {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
byte[] totalOutput = outputStream.toByteArray();
List<ByteBuffer> bufList = outputStream.getBufferListAndClear();
List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
int offset = 0;
int byteToRead;
for (final ByteBuffer temp : bufList) {
Expand Down
Loading