Skip to content

Commit

Permalink
[SPARK-26289][CORE] cleanup enablePerfMetrics parameter from BytesToB…
Browse files Browse the repository at this point in the history
…ytesMap

## What changes were proposed in this pull request?

`enablePerfMetrics `was originally designed in `BytesToBytesMap `to control `getNumHashCollisions  getTimeSpentResizingNs  getAverageProbesPerLookup`.

However, as the Spark version gradual progress.  this parameter is only used for `getAverageProbesPerLookup ` and always given to true when using `BytesToBytesMap`.

 it is also dangerous to determine whether `getAverageProbesPerLookup `opens and throws an `IllegalStateException `exception.
So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. thanks.

## How was this patch tested?

the existed test cases.

Closes #23244 from heary-cao/enablePerfMetrics.

Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
heary-cao authored and cloud-fan committed Dec 7, 2018
1 parent dbd90e5 commit bfc5569
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private final Location loc;

private final boolean enablePerfMetrics;
private long numProbes = 0L;

private long numProbes = 0;

private long numKeyLookups = 0;
private long numKeyLookups = 0L;

private long peakMemoryUsedBytes = 0L;

Expand All @@ -180,16 +178,14 @@ public BytesToBytesMap(
SerializerManager serializerManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes,
boolean enablePerfMetrics) {
long pageSizeBytes) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.serializerManager = serializerManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
this.enablePerfMetrics = enablePerfMetrics;
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Initial capacity must be greater than 0");
}
Expand All @@ -209,23 +205,14 @@ public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes) {
this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
}

public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
this(
taskMemoryManager,
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
initialCapacity,
// In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
0.5,
pageSizeBytes,
enablePerfMetrics);
pageSizeBytes);
}

/**
Expand Down Expand Up @@ -462,15 +449,12 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash)
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
assert(longArray != null);

if (enablePerfMetrics) {
numKeyLookups++;
}
numKeyLookups++;

int pos = hash & mask;
int step = 1;
while (true) {
if (enablePerfMetrics) {
numProbes++;
}
numProbes++;
if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hash, false);
Expand Down Expand Up @@ -860,9 +844,6 @@ public long getPeakMemoryUsedBytes() {
* Returns the average number of probes per key lookup.
*/
public double getAverageProbesPerLookup() {
if (!enablePerfMetrics) {
throw new IllegalStateException();
}
return (1.0 * numProbes) / numKeyLookups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ public void failureToGrow() {
@Test
public void spillInIterator() throws IOException {
BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024);
try {
int i;
for (i = 0; i < 1024; i++) {
Expand Down Expand Up @@ -569,7 +569,7 @@ public void spillInIterator() throws IOException {
@Test
public void multipleValuesForSameKey() {
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
try {
int i;
for (i = 0; i < 1024; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public UnsafeFixedWidthAggregationMap(
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
this.map = new BytesToBytesMap(
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true);
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes);

// Initialize the buffer for aggregation value
final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ private[joins] class UnsafeHashedRelation(
binaryMap = new BytesToBytesMap(
taskMemoryManager,
(nKeys * 1.5 + 1).toInt, // reduce hash collision
pageSizeBytes,
true)
pageSizeBytes)

var i = 0
var keyBuffer = new Array[Byte](1024)
Expand Down Expand Up @@ -299,8 +298,7 @@ private[joins] object UnsafeHashedRelation {
taskMemoryManager,
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
(sizeEstimate * 1.5 + 1).toInt,
pageSizeBytes,
true)
pageSizeBytes)

// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
Expand Down

0 comments on commit bfc5569

Please sign in to comment.