Skip to content

Commit

Permalink
Merge branch 'master' into palantir-master
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Kruszewski committed Feb 18, 2018
2 parents bbe0160 + 3ee3b2a commit 45101f6
Show file tree
Hide file tree
Showing 179 changed files with 3,787 additions and 1,561 deletions.
4 changes: 3 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,9 @@ setMethod("last_day",
})

#' @details
#' \code{length}: Computes the length of a given string or binary column.
#' \code{length}: Computes the character length of a string data or number of bytes
#' of a binary data. The length of string data includes the trailing spaces.
#' The length of binary data includes binary zeros.
#'
#' @rdname column_string_functions
#' @aliases length length,Column-method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, i
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
Expand All @@ -71,6 +73,20 @@ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, i
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, i
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
Expand All @@ -71,6 +73,20 @@ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, i
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ private boolean shouldPool(long size) {

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (shouldPool(size)) {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
Expand All @@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
return memory;
}
}
bufferPoolsBySize.remove(size);
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[(int) ((size + 7) / 8)];
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
Expand Down Expand Up @@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

if (shouldPool(size)) {
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.unsafe;

import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;

Expand Down Expand Up @@ -134,4 +135,26 @@ public void memoryDebugFillEnabledInTest() {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}

@Test
public void heapMemoryReuse() {
MemoryAllocator heapMem = new HeapMemoryAllocator();
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// allocate new memory every time.
MemoryBlock onheap1 = heapMem.allocate(513);
Object obj1 = onheap1.getBaseObject();
heapMem.free(onheap1);
MemoryBlock onheap2 = heapMem.allocate(514);
Assert.assertNotEquals(obj1, onheap2.getBaseObject());

// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// reuse the previous memory which has released.
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
Object obj3 = onheap3.getBaseObject();
heapMem.free(onheap3);
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Random;
import java.util.Set;

import scala.util.hashing.MurmurHash3$;

import org.apache.spark.unsafe.Platform;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -51,6 +53,23 @@ public void testKnownLongInputs() {
Assert.assertEquals(-2106506049, hasher.hashLong(Long.MAX_VALUE));
}

// SPARK-23381 Check whether the hash of the byte array is the same as another implementations
@Test
public void testKnownBytesInputs() {
byte[] test = "test".getBytes(StandardCharsets.UTF_8);
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(test, 0),
Murmur3_x86_32.hashUnsafeBytes2(test, Platform.BYTE_ARRAY_OFFSET, test.length, 0));
byte[] test1 = "test1".getBytes(StandardCharsets.UTF_8);
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(test1, 0),
Murmur3_x86_32.hashUnsafeBytes2(test1, Platform.BYTE_ARRAY_OFFSET, test1.length, 0));
byte[] te = "te".getBytes(StandardCharsets.UTF_8);
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(te, 0),
Murmur3_x86_32.hashUnsafeBytes2(te, Platform.BYTE_ARRAY_OFFSET, te.length, 0));
byte[] tes = "tes".getBytes(StandardCharsets.UTF_8);
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(tes, 0),
Murmur3_x86_32.hashUnsafeBytes2(tes, Platform.BYTE_ARRAY_OFFSET, tes.length, 0));
}

@Test
public void randomizedStressTest() {
int size = 65536;
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/spark/SparkExecutorInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public interface SparkExecutorInfo extends Serializable {
int port();
long cacheSize();
int numRunningTasks();
long usedOnHeapStorageMemory();
long usedOffHeapStorageMemory();
long totalOnHeapStorageMemory();
long totalOffHeapStorageMemory();
}
Loading

0 comments on commit 45101f6

Please sign in to comment.