From ff4b61e49ab5b6ab91c9c03ee5c5aeb071d3b03c Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Sat, 1 Sep 2018 14:50:54 -0600 Subject: [PATCH] batching: fix permit leak (#567) Previously flow control was released only if the RPC returns successfully. This caused us to leak permits if RPCs fail. This commit makes us unconditionally release permits. --- .../batching/AccumulatingBatchReceiver.java | 20 ++- .../api/gax/batching/ThresholdBatcher.java | 33 ++++- .../gax/batching/ThresholdBatcherTest.java | 121 +++++++++++++----- 3 files changed, 130 insertions(+), 44 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/AccumulatingBatchReceiver.java b/gax/src/main/java/com/google/api/gax/batching/AccumulatingBatchReceiver.java index eb0d5f0d8..5a7a30280 100644 --- a/gax/src/main/java/com/google/api/gax/batching/AccumulatingBatchReceiver.java +++ b/gax/src/main/java/com/google/api/gax/batching/AccumulatingBatchReceiver.java @@ -30,15 +30,20 @@ package com.google.api.gax.batching; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; -/** A simple ThresholdBatchReceiver that just accumulates batches. Not thread-safe. */ +/** A simple ThresholdBatchReceiver that just accumulates batches. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") public final class AccumulatingBatchReceiver implements ThresholdBatchReceiver { - private final List batches = new ArrayList<>(); + private final ConcurrentLinkedQueue batches = new ConcurrentLinkedQueue<>(); + private final ApiFuture retFuture; + + public AccumulatingBatchReceiver(ApiFuture retFuture) { + this.retFuture = retFuture; + } @Override public void validateBatch(T message) { @@ -48,11 +53,14 @@ public void validateBatch(T message) { @Override public ApiFuture processBatch(T batch) { batches.add(batch); - return ApiFutures.immediateFuture(null); + return retFuture; } - /** Returns the accumulated batches. */ + /** + * Returns the accumulated batches. If called concurrently with {@code processBatch}, the new + * batch may or may not be returned. + */ public List getBatches() { - return batches; + return new ArrayList<>(batches); } } diff --git a/gax/src/main/java/com/google/api/gax/batching/ThresholdBatcher.java b/gax/src/main/java/com/google/api/gax/batching/ThresholdBatcher.java index 057b7f91d..08f3ca070 100644 --- a/gax/src/main/java/com/google/api/gax/batching/ThresholdBatcher.java +++ b/gax/src/main/java/com/google/api/gax/batching/ThresholdBatcher.java @@ -33,8 +33,10 @@ import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -216,10 +218,35 @@ public ApiFuture pushCurrentBatch() { final E batch = removeBatch(); if (batch == null) { return ApiFutures.immediateFuture(null); - } else { - return ApiFutures.transform( - receiver.processBatch(batch), new ReleaseResourcesFunction<>(batch), directExecutor()); } + + final SettableApiFuture retFuture = SettableApiFuture.create(); + + // It is tempting to use transform to both release and get ApiFuture. + // This is incorrect because we also need to release on failure. + // + // It is also tempting to transform to get ApiFuture and addListener + // separately to release. This probably works as most users expect, + // but makes this class hard to test: retFuture.get() returning + // won't guarantee that flow control has been released. + ApiFutures.addCallback( + receiver.processBatch(batch), + new ApiFutureCallback() { + @Override + public void onSuccess(Object obj) { + flowController.release(batch); + retFuture.set(null); + } + + @Override + public void onFailure(Throwable t) { + flowController.release(batch); + retFuture.setException(t); + } + }, + directExecutor()); + + return retFuture; } private E removeBatch() { diff --git a/gax/src/test/java/com/google/api/gax/batching/ThresholdBatcherTest.java b/gax/src/test/java/com/google/api/gax/batching/ThresholdBatcherTest.java index 6291a47fc..ce94853c0 100644 --- a/gax/src/test/java/com/google/api/gax/batching/ThresholdBatcherTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/ThresholdBatcherTest.java @@ -29,14 +29,18 @@ */ package com.google.api.gax.batching; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; -import com.google.common.truth.Truth; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -142,21 +146,23 @@ private static ThresholdBatcher.Builder createSimpleBatcherBuidler( @Test public void testAdd() throws Exception { - AccumulatingBatchReceiver receiver = new AccumulatingBatchReceiver<>(); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFuture(null)); ThresholdBatcher batcher = createSimpleBatcherBuidler(receiver).build(); batcher.add(SimpleBatch.fromInteger(14)); - Truth.assertThat(batcher.isEmpty()).isFalse(); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(0); + assertThat(batcher.isEmpty()).isFalse(); + assertThat(receiver.getBatches()).hasSize(0); batcher.pushCurrentBatch().get(); - Truth.assertThat(batcher.isEmpty()).isTrue(); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(1); - Truth.assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14)); + assertThat(batcher.isEmpty()).isTrue(); + assertThat(receiver.getBatches()).hasSize(1); + assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14)); } @Test public void testBatching() throws Exception { - AccumulatingBatchReceiver receiver = new AccumulatingBatchReceiver<>(); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFuture(null)); ThresholdBatcher batcher = createSimpleBatcherBuidler(receiver) .setThresholds(BatchingThresholds.create(2)) @@ -166,13 +172,13 @@ public void testBatching() throws Exception { batcher.add(SimpleBatch.fromInteger(5)); // Give time for the executor to push the batch Thread.sleep(100); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(1); + assertThat(receiver.getBatches()).hasSize(1); batcher.add(SimpleBatch.fromInteger(7)); batcher.add(SimpleBatch.fromInteger(9)); // Give time for the executor to push the batch Thread.sleep(100); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(2); + assertThat(receiver.getBatches()).hasSize(2); batcher.add(SimpleBatch.fromInteger(11)); @@ -184,12 +190,13 @@ public void testBatching() throws Exception { for (SimpleBatch batch : receiver.getBatches()) { actual.add(batch.getIntegers()); } - Truth.assertThat(actual).isEqualTo(expected); + assertThat(actual).isEqualTo(expected); } @Test public void testBatchingWithDelay() throws Exception { - AccumulatingBatchReceiver receiver = new AccumulatingBatchReceiver<>(); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFuture(null)); ThresholdBatcher batcher = createSimpleBatcherBuidler(receiver).setMaxDelay(Duration.ofMillis(100)).build(); @@ -197,7 +204,7 @@ public void testBatchingWithDelay() throws Exception { batcher.add(SimpleBatch.fromInteger(5)); // Give time for the delay to trigger and push the batch Thread.sleep(500); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(1); + assertThat(receiver.getBatches()).hasSize(1); batcher.add(SimpleBatch.fromInteger(11)); @@ -208,7 +215,7 @@ public void testBatchingWithDelay() throws Exception { for (SimpleBatch batch : receiver.getBatches()) { actual.add(batch.getIntegers()); } - Truth.assertThat(actual).isEqualTo(expected); + assertThat(actual).isEqualTo(expected); } @Test @@ -218,14 +225,16 @@ public void testExceptionWithNullFlowController() { .setThresholds(BatchingThresholds.create(100)) .setExecutor(EXECUTOR) .setMaxDelay(Duration.ofMillis(10000)) - .setReceiver(new AccumulatingBatchReceiver()) + .setReceiver( + new AccumulatingBatchReceiver(ApiFutures.immediateFuture(null))) .setBatchMerger(new SimpleBatchMerger()) .build(); } @Test public void testBatchingWithFlowControl() throws Exception { - AccumulatingBatchReceiver receiver = new AccumulatingBatchReceiver<>(); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFuture(null)); ThresholdBatcher batcher = createSimpleBatcherBuidler(receiver) .setThresholds(BatchingThresholds.create(2)) @@ -233,20 +242,20 @@ public void testBatchingWithFlowControl() throws Exception { getTrackedIntegerBatchingFlowController(2L, null, LimitExceededBehavior.Block)) .build(); - Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0); + assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0); + assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0); batcher.add(SimpleBatch.fromInteger(3)); batcher.add(SimpleBatch.fromInteger(5)); batcher.add( SimpleBatch.fromInteger(7)); // We expect to block here until the first batch is handled - Truth.assertThat(receiver.getBatches().size()).isEqualTo(1); + assertThat(receiver.getBatches()).hasSize(1); batcher.add(SimpleBatch.fromInteger(9)); batcher.add( SimpleBatch.fromInteger(11)); // We expect to block here until the second batch is handled - Truth.assertThat(receiver.getBatches().size()).isEqualTo(2); + assertThat(receiver.getBatches()).hasSize(2); batcher.pushCurrentBatch().get(); @@ -256,17 +265,18 @@ public void testBatchingWithFlowControl() throws Exception { for (SimpleBatch batch : receiver.getBatches()) { actual.add(batch.getIntegers()); } - Truth.assertThat(actual).isEqualTo(expected); + assertThat(actual).isEqualTo(expected); - Truth.assertThat(trackedFlowController.getElementsReserved()) + assertThat(trackedFlowController.getElementsReserved()) .isEqualTo(trackedFlowController.getElementsReleased()); - Truth.assertThat(trackedFlowController.getBytesReserved()) + assertThat(trackedFlowController.getBytesReserved()) .isEqualTo(trackedFlowController.getBytesReleased()); } @Test public void testBatchingFlowControlExceptionRecovery() throws Exception { - AccumulatingBatchReceiver receiver = new AccumulatingBatchReceiver<>(); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFuture(null)); ThresholdBatcher batcher = createSimpleBatcherBuidler(receiver) .setThresholds(BatchingThresholds.create(4)) @@ -275,21 +285,21 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception { 3L, null, LimitExceededBehavior.ThrowException)) .build(); - Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0); - Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0); + assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0); + assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0); batcher.add(SimpleBatch.fromInteger(3)); batcher.add(SimpleBatch.fromInteger(5)); batcher.add(SimpleBatch.fromInteger(7)); try { batcher.add(SimpleBatch.fromInteger(9)); - Truth.assertWithMessage("Failing: expected exception").that(false).isTrue(); + Assert.fail("expected exception"); } catch (FlowControlException e) { } batcher.pushCurrentBatch().get(); - Truth.assertThat(receiver.getBatches().size()).isEqualTo(1); + assertThat(receiver.getBatches()).hasSize(1); batcher.add(SimpleBatch.fromInteger(11)); batcher.add(SimpleBatch.fromInteger(13)); batcher.pushCurrentBatch().get(); @@ -299,11 +309,52 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception { for (SimpleBatch batch : receiver.getBatches()) { actual.add(batch.getIntegers()); } - Truth.assertThat(actual).isEqualTo(expected); + assertThat(actual).isEqualTo(expected); + + assertThat(trackedFlowController.getElementsReserved()) + .isEqualTo(trackedFlowController.getElementsReleased()); + assertThat(trackedFlowController.getBytesReserved()) + .isEqualTo(trackedFlowController.getBytesReleased()); + } + + @Test + public void testBatchingFailedRPC() throws Exception { + Exception ex = new IllegalStateException("does nothing, unsuccessfully"); + AccumulatingBatchReceiver receiver = + new AccumulatingBatchReceiver<>(ApiFutures.immediateFailedFuture(ex)); + ThresholdBatcher batcher = + createSimpleBatcherBuidler(receiver) + .setThresholds(BatchingThresholds.create(4)) + .setFlowController( + getTrackedIntegerBatchingFlowController( + 3L, null, LimitExceededBehavior.ThrowException)) + .build(); + + assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0); + assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0); + assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0); + + batcher.add(SimpleBatch.fromInteger(3)); + try { + batcher.pushCurrentBatch().get(); + Assert.fail("expected exception"); + } catch (Exception e) { + assertThat(e).isInstanceOf(ExecutionException.class); + assertThat(e).hasCauseThat().isSameAs(ex); + } + assertThat(receiver.getBatches()).hasSize(1); + + List> expected = Arrays.asList(Arrays.asList(3)); + List> actual = new ArrayList<>(); + for (SimpleBatch batch : receiver.getBatches()) { + actual.add(batch.getIntegers()); + } + assertThat(actual).isEqualTo(expected); - Truth.assertThat(trackedFlowController.getElementsReserved()) + assertThat(trackedFlowController.getElementsReserved()) .isEqualTo(trackedFlowController.getElementsReleased()); - Truth.assertThat(trackedFlowController.getBytesReserved()) + assertThat(trackedFlowController.getBytesReserved()) .isEqualTo(trackedFlowController.getBytesReleased()); } }