Skip to content

Commit

Permalink
Adding resume publish. (#5046)
Browse files Browse the repository at this point in the history
* Adding resume publish.

* Addressing comments
  • Loading branch information
sduskis committed May 2, 2019
1 parent f6fcbed commit c918164
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,29 +148,21 @@ private Publisher(Builder builder) throws IOException {
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
}

Set<StatusCode.Code> retryCodes;
if (enableMessageOrdering) {
retryCodes = EnumSet.allOf(StatusCode.Code.class);
} else {
retryCodes =
EnumSet.of(
StatusCode.Code.ABORTED,
StatusCode.Code.CANCELLED,
StatusCode.Code.DEADLINE_EXCEEDED,
StatusCode.Code.INTERNAL,
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE);
}

PublisherStubSettings.Builder stubSettings =
PublisherStubSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(FixedExecutorProvider.create(executor))
.setTransportChannelProvider(builder.channelProvider);
stubSettings
.publishSettings()
.setRetryableCodes(retryCodes)
.setRetryableCodes(EnumSet.of(
StatusCode.Code.ABORTED,
StatusCode.Code.CANCELLED,
StatusCode.Code.DEADLINE_EXCEEDED,
StatusCode.Code.INTERNAL,
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE))
.setRetrySettings(retrySettingsBuilder.build())
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
Expand Down Expand Up @@ -263,6 +255,18 @@ public void run() {
return outstandingPublish.publishResult;
}

/**
* There may be non-recoverable problems with a request for an ordering key. In that case, all
* subsequent requests will fail until this method is called. If the key is not currently paused,
* calling this method will be a no-op.
*
* @param key The key for which to resume publishing.
*/
public void resumePublish(String key) {
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
sequentialExecutor.resumePublish(key);
}

private void setupAlarm() {
if (!messagesBatches.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -147,6 +151,8 @@ static class CallbackExecutor extends SequentialExecutor<CancellableRunnable> {
new CancellationException(
"Execution cancelled because executing previous runnable failed.");

private final Set<String> keysWithErrors = Collections.synchronizedSet(new HashSet<String>());

CallbackExecutor(Executor executor) {
super(executor);
}
Expand Down Expand Up @@ -186,6 +192,11 @@ <T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable)
// Step 1: create a future for the user
final SettableApiFuture<T> future = SettableApiFuture.create();

if (keysWithErrors.contains(key)) {
future.setException(CANCELLATION_EXCEPTION);
return future;
}

// Step 2: create the CancellableRunnable
// Step 3: add the task to queue via `execute`
CancellableRunnable task =
Expand Down Expand Up @@ -213,6 +224,7 @@ public void onSuccess(T msg) {
// Step 5.2: on failure
@Override
public void onFailure(Throwable e) {
keysWithErrors.add(key);
future.setException(e);
cancelQueuedTasks(key, CANCELLATION_EXCEPTION);
}
Expand All @@ -233,7 +245,11 @@ public void cancel(Throwable e) {
return future;
}

/** Cancels every task in the queue assoicated with {@code key}. */
void resumePublish(String key) {
keysWithErrors.remove(key);
}

/** Cancels every task in the queue associated with {@code key}. */
private void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -443,6 +444,95 @@ public void testEnableMessageOrdering_dontSendWhileInflight() throws Exception {
publisher.shutdown();
}

@Test
/**
* Make sure that resume publishing works as expected:
*
* <ol>
* <li> publish with key orderA which returns a failure.</li>
* <li> publish with key orderA again, which should fail immediately</li>
* <li> publish with key orderB, which should succeed</li>
* <li> resume publishing on key orderA</li>
* <li> publish with key orderA, which should now succeed</li>
* </ol>
*
*/
public void testResumePublish() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.build())
.setEnableMessageOrdering(true)
.build();

ApiFuture<String> future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA");
ApiFuture<String> future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA");

fakeExecutor.advanceTime(Duration.ZERO);
assertFalse(future1.isDone());
assertFalse(future2.isDone());

// This exception should stop future publishing to the same key
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));

fakeExecutor.advanceTime(Duration.ZERO);

try {
future1.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

try {
future2.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

// Submit new requests with orderA that should fail.
ApiFuture<String> future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA");
ApiFuture<String> future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA");

try {
future3.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

try {
future4.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

// Submit a new request with orderB, which should succeed
ApiFuture<String> future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB");
ApiFuture<String> future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB");

testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6"));

Assert.assertEquals("5", future5.get());
Assert.assertEquals("6", future6.get());

// Resume publishing of "orderA", which should now succeed
publisher.resumePublish("orderA");

ApiFuture<String> future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA");
ApiFuture<String> future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA");

testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8"));

Assert.assertEquals("7", future7.get());
Assert.assertEquals("8", future8.get());

publisher.shutdown();
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(
Expand Down

0 comments on commit c918164

Please sign in to comment.