Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
bugfix: catch and rethrow BlockingOperationException for batchSet (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and neverchanje committed Sep 24, 2019
1 parent 92a82c5 commit 4e40054
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
dist: trusty

language: java

jdk:
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
<fork>true</fork>
<verbose>true</verbose>
<encoding>UTF-8</encoding>
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;

final class FutureGroup<Result> {

FutureGroup(int initialCapacity) {
asyncTasks = new ArrayList<>(initialCapacity);
}

public void add(Future<Result> task) {
asyncTasks.add(task);
}

void waitAllCompleteOrOneFail(int timeoutMillis) throws PException {
waitAllCompleteOrOneFail(null, timeoutMillis);
}

// Waits until all future tasks complete but terminate if one fails.
// `results` is nullable
void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) throws PException {
for (int i = 0; i < asyncTasks.size(); i++) {
Future<Result> fu = asyncTasks.get(i);
try {
fu.await(timeoutMillis);
} catch (Exception e) {
throw new PException("async task #[" + i + "] await failed: " + e.toString());
}
if (fu.isSuccess()) {
if (results != null) {
results.set(i, fu.getNow());
}
} else {
Throwable cause = fu.cause();
throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause);
}
}
}

private List<Future<Result>> asyncTasks;
}
27 changes: 10 additions & 17 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.tools.Tools;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,12 +27,10 @@
* <p>Implementation of {@link PegasusTableInterface}.
*/
public class PegasusTable implements PegasusTableInterface {
private PegasusClient client;
private Table table;
private int defaultTimeout;

public PegasusTable(PegasusClient client, Table table) {
this.client = client;
this.table = table;
this.defaultTimeout = table.getDefaultTimeout();
}
Expand Down Expand Up @@ -92,7 +91,7 @@ public void onCompletion(client_operator clientOP) {
}

@Override
public Future<byte[]> asyncGet(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) {
public Future<byte[]> asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms */) {
final DefaultPromise<byte[]> promise = table.newPromise();
blob request = new blob(PegasusClient.generateKey(hashKey, sortKey));
long partitionHash = table.getHash(request.data);
Expand Down Expand Up @@ -122,7 +121,7 @@ public void onCompletion(client_operator clientOP) {

@Override
public Future<Void> asyncSet(
byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/) {
byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /* ms */) {
final DefaultPromise<Void> promise = table.newPromise();
if (value == null) {
promise.setFailure(new PException("Invalid parameter: value should not be null"));
Expand Down Expand Up @@ -273,7 +272,7 @@ public Future<MultiGetResult> asyncMultiGet(
MultiGetOptions options,
int maxFetchCount,
int maxFetchSize,
int timeout /*ms*/) {
int timeout /* ms */) {
final DefaultPromise<MultiGetResult> promise = table.newPromise();
if (hashKey == null || hashKey.length == 0) {
promise.setFailure(new PException("Invalid parameter: hashKey should not be null or empty"));
Expand Down Expand Up @@ -342,7 +341,7 @@ public Future<MultiGetResult> asyncMultiGet(
byte[] startSortKey,
byte[] stopSortKey,
MultiGetOptions options,
int timeout /*ms*/) {
int timeout /* ms */) {
return asyncMultiGet(hashKey, startSortKey, stopSortKey, options, 100, 1000000, timeout);
}

Expand Down Expand Up @@ -1166,18 +1165,12 @@ public void batchSet(List<SetItem> items, int timeout) throws PException {
if (items == null) {
throw new PException("Invalid parameter: items should not be null");
}
List<Future<Void>> futures = new ArrayList<Future<Void>>();
if (timeout <= 0) timeout = defaultTimeout;
FutureGroup<Void> group = new FutureGroup<>(items.size());
for (SetItem i : items) {
futures.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout));
}
for (int i = 0; i < items.size(); i++) {
Future<Void> fu = futures.get(i);
fu.awaitUninterruptibly();
if (!fu.isSuccess()) {
Throwable cause = fu.cause();
throw new PException("Set value of items[" + i + "] failed: " + cause.getMessage(), cause);
}
group.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout));
}
group.waitAllCompleteOrOneFail(timeout);
}

@Override
Expand Down
67 changes: 67 additions & 0 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;

public class TestFutureGroup {

private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, Executors.defaultThreadFactory(), false);
}

@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
}

@Test
public void testBlockingOperationException() throws Exception {
// ensure pegasus client will throw PException when BlockingOperationException is thrown.

TestEventExecutor executor = new TestEventExecutor();
Promise<Void> promise = executor.newPromise();
AtomicBoolean executed = new AtomicBoolean(false);
AtomicBoolean success = new AtomicBoolean(true);

executor.execute(
() -> {
// A background thread waiting for promise to complete.
FutureGroup<Void> group = new FutureGroup<>(1);
group.add(promise);
try {
group.waitAllCompleteOrOneFail(10000);
} catch (PException e) {
success.set(false);
System.err.println("TestFutureGroup.testInterrupt: " + e.toString());
}
executed.set(true);
});

while (executor.pendingTasks() != 0) {
Thread.sleep(100);
}

promise.setSuccess(null);

// block until the background thread finished.
while (!executed.get()) {
Thread.sleep(100);
}

Assert.assertFalse(success.get());
}
}

0 comments on commit 4e40054

Please sign in to comment.