Skip to content

Commit

Permalink
Drop unnecessary synchronization
Browse files Browse the repository at this point in the history
A new instance is created for each ExchangeSink
  • Loading branch information
arhimondr committed Nov 20, 2022
1 parent 96464e5 commit 45e111c
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

@NotThreadSafe
class PartitioningExchanger
implements LocalExchanger
{
private final List<Consumer<Page>> buffers;
private final LocalExchangeMemoryManager memoryManager;
private final Function<Page, Page> partitionedPagePreparer;
private final PartitionFunction partitionFunction;
@GuardedBy("this")
private final IntArrayList[] partitionAssignments;

public PartitioningExchanger(
Expand Down Expand Up @@ -68,7 +68,7 @@ public void accept(Page page)
}

@Nullable
private synchronized Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
{
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
for (int position = 0; position < partitionPage.getPositionCount(); position++) {
Expand Down

0 comments on commit 45e111c

Please sign in to comment.