Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: introduce closeAsync to Batcher (#1423)
Browse files Browse the repository at this point in the history
* feat: introduce closeAsync to Batcher

This should allow callers to signal that they are done using a batcher without blocking their thread.

* improve comments

* add tests

* format
  • Loading branch information
igorbernstein2 authored Jul 16, 2021
1 parent 87636a5 commit aab5288
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 13 deletions.
10 changes: 9 additions & 1 deletion gax/src/main/java/com/google/api/gax/batching/Batcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand All @@ -45,6 +46,7 @@
* @param <ElementResultT> The type of the result for each individual element.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {

/**
Expand Down Expand Up @@ -74,9 +76,15 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
void sendOutstanding();

/**
* Closes this Batcher by preventing new elements from being added and flushing the existing
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
* elements.
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
* elements. The returned future will be resolved when the last element completes
*/
ApiFuture<Void> closeAsync();
}
77 changes: 66 additions & 11 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private SettableApiFuture<Void> closeFuture;
private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;

Expand Down Expand Up @@ -172,7 +173,10 @@ public BatcherImpl(
/** {@inheritDoc} */
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
// Note: there is no need to synchronize over closeFuture. The write & read of the variable
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
Expand Down Expand Up @@ -257,9 +261,20 @@ public void onFailure(Throwable throwable) {
}

private void onBatchCompletion() {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
synchronized (flushLock) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
flushLock.notifyAll();
shouldClose = closeFuture != null;
}
}
if (shouldClose) {
BatchingException overallError = batcherStats.asException();
if (overallError != null) {
closeFuture.setException(overallError);
} else {
closeFuture.set(null);
}
}
}
Expand All @@ -279,17 +294,57 @@ private void awaitAllOutstandingBatches() throws InterruptedException {
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
if (isClosed) {
return;
try {
closeAsync().get();
} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
if (e.getCause() instanceof BatchingException) {
BatchingException cause = (BatchingException) e.getCause();
throw new BatchingException(cause.getMessage());
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
}
}
}

@Override
public ApiFuture<Void> closeAsync() {
if (closeFuture != null) {
return closeFuture;
}

// Send any buffered elements
// Must come before numOfOutstandingBatches check below
sendOutstanding();

boolean closeImmediately;

synchronized (flushLock) {
// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
}
flush();

// Clean up accounting
scheduledFuture.cancel(true);
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
BatchingException exception = batcherStats.asException();
if (exception != null) {
throw exception;

// notify futures
if (closeImmediately) {
finishClose();
}
return closeFuture;
}

private void finishClose() {
BatchingException batchingException = batcherStats.asException();
if (batchingException != null) {
closeFuture.setException(batchingException);
} else {
closeFuture.set(null);
}
}

Expand Down
126 changes: 125 additions & 1 deletion gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import java.util.ArrayList;
Expand All @@ -69,7 +70,9 @@
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;
Expand All @@ -92,7 +95,12 @@ public class BatcherImplTest {
@After
public void tearDown() throws InterruptedException {
if (underTest != null) {
underTest.close();
try {
// Close the batcher to avoid warnings of orphaned batchers
underTest.close();
} catch (BatchingException ignored) {
// Some tests intentionally inject failures into mutations
}
}
}

Expand Down Expand Up @@ -172,6 +180,55 @@ public void testNoElementAdditionAfterClose() throws Exception {
.matches("Cannot add elements on a closed batcher");
}

/** Validates exception when batch is called after {@link Batcher#close()}. */
@Test
public void testNoElementAdditionAfterCloseAsync() throws Exception {
underTest = createDefaultBatcherImpl(batchingSettings, null);
underTest.add(1);
underTest.closeAsync();

IllegalStateException e =
Assert.assertThrows(
IllegalStateException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
underTest.add(1);
}
});

assertThat(e).hasMessageThat().matches("Cannot add elements on a closed batcher");
}

@Test
public void testCloseAsyncNonblocking() throws ExecutionException, InterruptedException {
final SettableApiFuture<List<Integer>> innerFuture = SettableApiFuture.create();

UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return innerFuture;
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);

ApiFuture<Integer> elementFuture = underTest.add(1);

Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<Void> closeFuture = underTest.closeAsync();
assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isAtMost(100);

assertThat(closeFuture.isDone()).isFalse();
assertThat(elementFuture.isDone()).isFalse();

innerFuture.set(ImmutableList.of(1));
closeFuture.get();
}

/** Verifies exception occurred at RPC is propagated to element results */
@Test
public void testResultFailureAfterRPCFailure() throws Exception {
Expand Down Expand Up @@ -614,6 +671,73 @@ public boolean isLoggable(LogRecord record) {
}
}

/**
* Validates the absence of warning in case {@link BatcherImpl} is garbage collected after being
* closed.
*
* <p>Note:This test cannot run concurrently with other tests that use Batchers.
*/
@Test
public void testClosedBatchersAreNotLogged() throws Exception {
// Clean out the existing instances
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);

// Capture logs
final List<LogRecord> records = new ArrayList<>(1);
Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName());
Filter oldFilter = batcherLogger.getFilter();
batcherLogger.setFilter(
new Filter() {
@Override
public boolean isLoggable(LogRecord record) {
synchronized (records) {
records.add(record);
}
return false;
}
});

try {
// Create a bunch of batchers that will garbage collected after being closed
for (int i = 0; i < 1_000; i++) {
BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> batcher =
createDefaultBatcherImpl(batchingSettings, null);
batcher.add(1);

if (i % 2 == 0) {
batcher.close();
} else {
batcher.closeAsync();
}
}
// Run GC a few times to give the batchers a chance to be collected
for (int retry = 0; retry < 100; retry++) {
System.gc();
System.runFinalization();
BatcherReference.cleanQueue();
Thread.sleep(10);
}

synchronized (records) {
assertThat(records).isEmpty();
}
} finally {
// reset logging
batcherLogger.setFilter(oldFilter);
}
}

@Test
public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException {
int iterations = 1_000_000;
Expand Down

0 comments on commit aab5288

Please sign in to comment.