Skip to content

Commit

Permalink
Release memory retained by ExchangeSource
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and losipiuk committed Feb 22, 2022
1 parent c199fac commit ae713b7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private static class ExchangeOutputSource
{
private final Set<TaskId> selectedTasks;
private final QueryId queryId;
private final ListenableFuture<ExchangeSource> exchangeSourceFuture;
private ListenableFuture<ExchangeSource> exchangeSourceFuture;

private ExchangeSource exchangeSource;
private boolean finished;
Expand Down Expand Up @@ -848,6 +848,7 @@ public void close()
return;
}
finished = true;
exchangeSource = null;
addCallback(exchangeSourceFuture, new FutureCallback<>()
{
@Override
Expand All @@ -868,6 +869,7 @@ public void onFailure(Throwable ignored)
// It a failure occurred it is expected to be propagated by the getNext method
}
}, directExecutor());
exchangeSourceFuture = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.connector.CatalogName;
import io.trino.exchange.ExchangeManagerRegistry;
Expand Down Expand Up @@ -54,6 +55,8 @@
public class ExchangeOperator
implements SourceOperator
{
private static final Logger log = Logger.get(ExchangeOperator.class);

public static final CatalogName REMOTE_CONNECTOR_ID = new CatalogName("$remote");

public static class ExchangeOperatorFactory
Expand Down Expand Up @@ -459,15 +462,21 @@ public void close()
private static class SpoolingExchangeDataSource
implements ExchangeDataSource
{
private final ExchangeSource exchangeSource;
// This field is not final to allow releasing the memory retained by the ExchangeSource instance.
// It is modified (assigned to null) when the ExchangeOperator is closed.
// It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads.
// However since close can be called at any moment this variable has to be accessed in a safe way (avoiding "check-then-use").
private ExchangeSource exchangeSource;
private final List<ExchangeSourceHandle> exchangeSourceHandles;
private final LocalMemoryContext systemMemoryContext;
private volatile boolean closed;

private SpoolingExchangeDataSource(
ExchangeSource exchangeSource,
List<ExchangeSourceHandle> exchangeSourceHandles,
LocalMemoryContext systemMemoryContext)
{
// this assignment is expected to be followed by an assignment of a final field to ensure safe publication
this.exchangeSource = requireNonNull(exchangeSource, "exchangeSource is null");
this.exchangeSourceHandles = ImmutableList.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
Expand All @@ -476,20 +485,39 @@ private SpoolingExchangeDataSource(
@Override
public Slice pollPage()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return null;
}

Slice data = exchangeSource.read();
systemMemoryContext.setBytes(exchangeSource.getMemoryUsage());

// If the data source has been closed in a meantime reset memory usage back to 0
if (closed) {
systemMemoryContext.setBytes(0);
}

return data;
}

@Override
public boolean isFinished()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return true;
}
return exchangeSource.isFinished();
}

@Override
public ListenableFuture<Void> isBlocked()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return immediateVoidFuture();
}
return toListenableFuture(exchangeSource.isBlocked());
}

Expand Down Expand Up @@ -522,9 +550,22 @@ public OperatorInfo getInfo()
}

@Override
public void close()
public synchronized void close()
{
exchangeSource.close();
if (closed) {
return;
}
closed = true;
try {
exchangeSource.close();
}
catch (RuntimeException e) {
log.warn(e, "error closing exchange source");
}
finally {
exchangeSource = null;
systemMemoryContext.setBytes(0);
}
}
}
}

0 comments on commit ae713b7

Please sign in to comment.