> asyncTasks;
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
index ff989484..1aaa2340 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
@@ -11,7 +11,8 @@
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.tools.Tools;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -26,12 +27,10 @@
* Implementation of {@link PegasusTableInterface}.
*/
public class PegasusTable implements PegasusTableInterface {
- private PegasusClient client;
private Table table;
private int defaultTimeout;
public PegasusTable(PegasusClient client, Table table) {
- this.client = client;
this.table = table;
this.defaultTimeout = table.getDefaultTimeout();
}
@@ -92,7 +91,7 @@ public void onCompletion(client_operator clientOP) {
}
@Override
- public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) {
+ public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms */) {
final DefaultPromise promise = table.newPromise();
blob request = new blob(PegasusClient.generateKey(hashKey, sortKey));
long partitionHash = table.getHash(request.data);
@@ -122,7 +121,7 @@ public void onCompletion(client_operator clientOP) {
@Override
public Future asyncSet(
- byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/) {
+ byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /* ms */) {
final DefaultPromise promise = table.newPromise();
if (value == null) {
promise.setFailure(new PException("Invalid parameter: value should not be null"));
@@ -273,7 +272,7 @@ public Future asyncMultiGet(
MultiGetOptions options,
int maxFetchCount,
int maxFetchSize,
- int timeout /*ms*/) {
+ int timeout /* ms */) {
final DefaultPromise promise = table.newPromise();
if (hashKey == null || hashKey.length == 0) {
promise.setFailure(new PException("Invalid parameter: hashKey should not be null or empty"));
@@ -342,7 +341,7 @@ public Future asyncMultiGet(
byte[] startSortKey,
byte[] stopSortKey,
MultiGetOptions options,
- int timeout /*ms*/) {
+ int timeout /* ms */) {
return asyncMultiGet(hashKey, startSortKey, stopSortKey, options, 100, 1000000, timeout);
}
@@ -1166,18 +1165,12 @@ public void batchSet(List items, int timeout) throws PException {
if (items == null) {
throw new PException("Invalid parameter: items should not be null");
}
- List> futures = new ArrayList>();
+ if (timeout <= 0) timeout = defaultTimeout;
+ FutureGroup group = new FutureGroup<>(items.size());
for (SetItem i : items) {
- futures.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout));
- }
- for (int i = 0; i < items.size(); i++) {
- Future fu = futures.get(i);
- fu.awaitUninterruptibly();
- if (!fu.isSuccess()) {
- Throwable cause = fu.cause();
- throw new PException("Set value of items[" + i + "] failed: " + cause.getMessage(), cause);
- }
+ group.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout));
}
+ group.waitAllCompleteOrOneFail(timeout);
}
@Override
diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java
new file mode 100644
index 00000000..02a14e99
--- /dev/null
+++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java
@@ -0,0 +1,67 @@
+// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
+// This source code is licensed under the Apache License Version 2.0, which
+// can be found in the LICENSE file in the root directory of this source tree.
+package com.xiaomi.infra.pegasus.client;
+
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFutureGroup {
+
+ private static final class TestEventExecutor extends SingleThreadEventExecutor {
+ TestEventExecutor() {
+ super(null, Executors.defaultThreadFactory(), false);
+ }
+
+ @Override
+ protected void run() {
+ while (!confirmShutdown()) {
+ Runnable task = takeTask();
+ if (task != null) {
+ task.run();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testBlockingOperationException() throws Exception {
+ // ensure pegasus client will throw PException when BlockingOperationException is thrown.
+
+ TestEventExecutor executor = new TestEventExecutor();
+ Promise promise = executor.newPromise();
+ AtomicBoolean executed = new AtomicBoolean(false);
+ AtomicBoolean success = new AtomicBoolean(true);
+
+ executor.execute(
+ () -> {
+ // A background thread waiting for promise to complete.
+ FutureGroup group = new FutureGroup<>(1);
+ group.add(promise);
+ try {
+ group.waitAllCompleteOrOneFail(10000);
+ } catch (PException e) {
+ success.set(false);
+ System.err.println("TestFutureGroup.testInterrupt: " + e.toString());
+ }
+ executed.set(true);
+ });
+
+ while (executor.pendingTasks() != 0) {
+ Thread.sleep(100);
+ }
+
+ promise.setSuccess(null);
+
+ // block until the background thread finished.
+ while (!executed.get()) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertFalse(success.get());
+ }
+}