Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR #22338, the performance regression still exists. If we revert the changes in #19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 0b9ccd5)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gatorsmile authored and cloud-fan committed Sep 9, 2018
1 parent 8f7d8a0 commit a00a160
Show file tree
Hide file tree
Showing 40 changed files with 376 additions and 1,070 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.Platform;

/**
* Simulates Hive's hashing function from Hive v1.2.1
Expand All @@ -39,21 +38,12 @@ public static int hashLong(long input) {
return (int) ((input >>> 32) ^ input);
}

public static int hashUnsafeBytesBlock(MemoryBlock mb) {
long lengthInBytes = mb.size();
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
for (long i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) mb.getByte(i);
for (int i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) Platform.getByte(base, offset + i);
}
return result;
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}

public static int hashUTF8String(UTF8String str) {
return hashUnsafeBytesBlock(str.getMemoryBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static void setMemory(long address, byte value, long size) {
}

public static void copyMemory(
Object src, long srcOffset, Object dst, long dstOffset, long length) {
Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

public class ByteArrayMethods {

Expand Down Expand Up @@ -53,25 +52,15 @@ public static long roundNumberOfBytesToNearestWord(long numBytes) {
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;

private static final boolean unaligned = Platform.unaligned();
/**
* MemoryBlock equality check for MemoryBlocks.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEqualsBlock(
MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
}

/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;

// check if starts align and we can get both offsets to be aligned
// check if stars align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
Expand All @@ -32,12 +33,16 @@ public final class LongArray {
private static final long WIDTH = 8;

private final MemoryBlock memory;
private final Object baseObj;
private final long baseOffset;

private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}

Expand All @@ -46,11 +51,11 @@ public MemoryBlock memoryBlock() {
}

public Object getBaseObject() {
return memory.getBaseObject();
return baseObj;
}

public long getBaseOffset() {
return memory.getBaseOffset();
return baseOffset;
}

/**
Expand All @@ -64,8 +69,8 @@ public long size() {
* Fill this all with 0L.
*/
public void zeroOut() {
for (long off = 0; off < length * WIDTH; off += WIDTH) {
memory.putLong(off, 0);
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
Platform.putLong(baseObj, off, 0);
}
}

Expand All @@ -75,7 +80,7 @@ public void zeroOut() {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
memory.putLong(index * WIDTH, value);
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
}

/**
Expand All @@ -84,6 +89,6 @@ public void set(int index, long value) {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
return memory.getLong(index * WIDTH);
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.spark.unsafe.hash;

import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
Expand Down Expand Up @@ -53,74 +49,49 @@ public static int hashInt(int input, int seed) {
}

public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
return hashUnsafeWords(base, offset, lengthInBytes, seed);
}

public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByIntBlock(base, lengthInBytes, seed);
int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
return hashUnsafeBytesBlock(base, Ints.checkedCast(base.size()), seed);
}

private static int hashUnsafeBytesBlock(MemoryBlock base, int lengthInBytes, int seed) {
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
long offset = base.getBaseOffset();
Object o = base.getBaseObject();
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = Platform.getByte(o, offset + i);
int halfWord = Platform.getByte(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}

public static int hashUTF8String(UTF8String str, int seed) {
return hashUnsafeBytesBlock(str.getMemoryBlock(), str.numBytes(), seed);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and other implementations.
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (base.getByte(i) & 0xFF) << shift;
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByIntBlock(MemoryBlock base, int lengthInBytes, int seed) {
private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = base.getInt(i);
int halfWord = Platform.getInt(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
Expand Down

This file was deleted.

Loading

0 comments on commit a00a160

Please sign in to comment.