Skip to content

Commit

Permalink
Optimize union of domains in LocalDynamicFilterConsumer
Browse files Browse the repository at this point in the history
Union domains only when the size limit is exceeded
  • Loading branch information
arhimondr committed Jul 15, 2022
1 parent 850db66 commit 145ec31
Showing 1 changed file with 58 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import javax.annotation.concurrent.GuardedBy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -61,6 +63,7 @@ public class LocalDynamicFilterConsumer
private volatile boolean collected;

private final Queue<TupleDomain<DynamicFilterId>> summaryDomains = new ConcurrentLinkedQueue<>();
private final AtomicLong summaryDomainsRetainedSizeInBytes = new AtomicLong();

public LocalDynamicFilterConsumer(Map<DynamicFilterId, Integer> buildChannels, Map<DynamicFilterId, Type> filterBuildTypes, List<Consumer<Map<DynamicFilterId, Domain>>> collectors, DataSize domainSizeLimit)
{
Expand All @@ -80,56 +83,65 @@ public void addPartition(TupleDomain<DynamicFilterId> domain)
return;
}

long domainRetainedSizeInBytes = getRetainedSizeInBytes(domain);
summaryDomainsRetainedSizeInBytes.addAndGet(domainRetainedSizeInBytes);
summaryDomains.add(domain);
// Operators collecting dynamic filters tend to finish all at the same time
// when filters are collected right before the HashBuilderOperator.
// To avoid multiple task executor threads being blocked on waiting
// for each other when collecting the filters run the heavy union operation
// outside the lock.
unionSummaryDomains();
unionSummaryDomainsIfNecessary(false);

TupleDomain<DynamicFilterId> result;
synchronized (this) {
verify(expectedPartitionCount == null || collectedPartitionCount < expectedPartitionCount);

if (collected) {
summaryDomains.clear();
clearSummaryDomains();
return;
}
collectedPartitionCount++;

boolean allPartitionsCollected = expectedPartitionCount != null && collectedPartitionCount == expectedPartitionCount;
if (allPartitionsCollected) {
// run final compaction as previous concurrent compactions may have left more than a single domain
unionSummaryDomains();
unionSummaryDomainsIfNecessary(true);
}

boolean sizeLimitExceeded = false;
TupleDomain<DynamicFilterId> summary = summaryDomains.poll();
// summary can be null as another concurrent summary compaction may be running
if (summary != null) {
if (getRetainedSizeInBytes(summary) > domainSizeLimitInBytes) {
long originalSize = getRetainedSizeInBytes(summary);
if (originalSize > domainSizeLimitInBytes) {
summary = summary.simplify(1);
}
if (getRetainedSizeInBytes(summary) > domainSizeLimitInBytes) {
summaryDomainsRetainedSizeInBytes.addAndGet(-originalSize);
sizeLimitExceeded = true;
}
summaryDomains.add(summary);
else {
summaryDomainsRetainedSizeInBytes.addAndGet(getRetainedSizeInBytes(summary) - originalSize);
summaryDomains.add(summary);
}
}

if (!allPartitionsCollected && !sizeLimitExceeded && !domain.isAll()) {
return;
}

if (sizeLimitExceeded || domain.isAll()) {
summaryDomains.clear();
clearSummaryDomains();
result = TupleDomain.all();
}
else {
verify(expectedPartitionCount != null && collectedPartitionCount == expectedPartitionCount);
verify(summaryDomains.size() == 1);
result = summaryDomains.poll();
verify(result != null);
long currentSize = summaryDomainsRetainedSizeInBytes.addAndGet(-getRetainedSizeInBytes(result));
verify(currentSize == 0, "currentSize is expected to be zero: %s", currentSize);
}
collected = true;
}
Expand All @@ -155,33 +167,60 @@ public void setPartitionCount(int partitionCount)
}
else {
// run final compaction as previous concurrent compactions may have left more than a single domain
unionSummaryDomains();
unionSummaryDomainsIfNecessary(true);
verify(summaryDomains.size() == 1);
result = summaryDomains.poll();
verify(result != null);
long currentSize = summaryDomainsRetainedSizeInBytes.addAndGet(-getRetainedSizeInBytes(result));
verify(currentSize == 0, "currentSize is expected to be zero: %s", currentSize);
}
collected = true;
}

collectors.forEach(collector -> collector.accept(convertTupleDomain(result)));
}

private void unionSummaryDomains()
private void unionSummaryDomainsIfNecessary(boolean force)
{
if (summaryDomainsRetainedSizeInBytes.get() < domainSizeLimitInBytes && !force) {
return;
}

List<TupleDomain<DynamicFilterId>> domains = new ArrayList<>();
long domainsRetainedSizeInBytes = 0;
while (true) {
// This method is called every time a new domain is added to the summaryDomains queue.
// In a normal situation (when there's no race) there should be no more than 2 domains in the queue.
TupleDomain<DynamicFilterId> first = summaryDomains.poll();
if (first == null) {
return;
TupleDomain<DynamicFilterId> domain = summaryDomains.poll();
if (domain == null) {
break;
}
TupleDomain<DynamicFilterId> second = summaryDomains.poll();
if (second == null) {
summaryDomains.add(first);
return;
domains.add(domain);
domainsRetainedSizeInBytes += getRetainedSizeInBytes(domain);
}

if (domains.isEmpty()) {
return;
}

TupleDomain<DynamicFilterId> union = columnWiseUnion(domains);
summaryDomainsRetainedSizeInBytes.addAndGet(getRetainedSizeInBytes(union) - domainsRetainedSizeInBytes);
long currentSize = summaryDomainsRetainedSizeInBytes.get();
verify(currentSize >= 0, "currentSize is expected to be greater than or equal to zero: %s", currentSize);
summaryDomains.add(union);
}

private void clearSummaryDomains()
{
long domainsRetainedSizeInBytes = 0;
while (true) {
TupleDomain<DynamicFilterId> domain = summaryDomains.poll();
if (domain == null) {
break;
}
summaryDomains.add(columnWiseUnion(first, second));
domainsRetainedSizeInBytes += getRetainedSizeInBytes(domain);
}
summaryDomainsRetainedSizeInBytes.addAndGet(-domainsRetainedSizeInBytes);
long currentSize = summaryDomainsRetainedSizeInBytes.get();
verify(currentSize >= 0, "currentSize is expected to be greater than or equal to zero: %s", currentSize);
}

private Map<DynamicFilterId, Domain> convertTupleDomain(TupleDomain<DynamicFilterId> result)
Expand Down Expand Up @@ -246,6 +285,7 @@ public synchronized String toString()
.add("collectedPartitionCount", collectedPartitionCount)
.add("collected", collected)
.add("summaryDomains", summaryDomains)
.add("summaryDomainsRetainedSizeInBytes", summaryDomainsRetainedSizeInBytes)
.toString();
}

Expand Down

0 comments on commit 145ec31

Please sign in to comment.