Skip to content

Commit

Permalink
rpc: expose CompletableFuture for RpcCall#call method
Browse files Browse the repository at this point in the history
Motivation:
CompletableFuture is a building block for async service. As RpcCall class
already returns CompletableFuture, but it doesn't explicitly tells it.

Modification:
update signature of RpcCall#call methods to define return type as CompletableFuture.

Result:
Rpc clients can use full power of CompletableFuture.

Acked-by: Marina Sahakyan
Target: master
  • Loading branch information
kofemann committed Jul 23, 2018
1 parent 1e38f5c commit fe4cf32
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,12 @@ public void failed(Throwable t, InetSocketAddress attachment) {
* @param args The argument of the procedure.
* @param type The expected type of the reply
* @param auth auth to use for the call
* @return A Future representing the result of the operation.
* @return A CompletableFuture representing the result of the operation.
* @throws OncRpcException
* @throws IOException
* @since 2.4.0
*/
public <T extends XdrAble> Future<T> call(int procedure, XdrAble args, final Class<T> type, final RpcAuth auth)
public <T extends XdrAble> CompletableFuture<T> call(int procedure, XdrAble args, final Class<T> type, final RpcAuth auth)
throws IOException {
try {
T result = type.getDeclaredConstructor().newInstance();
Expand All @@ -511,9 +511,29 @@ public <T extends XdrAble> Future<T> call(int procedure, XdrAble args, final Cla
}

/**
* convenience version of {@link #call(int, XdrAble, Class, RpcAuth)} with no auth
*/
public <T extends XdrAble> Future<T> call(int procedure, XdrAble args, final Class<T> type)

/**
* Send asynchronous RPC request to a remove server.
*
* This method initiates an asynchronous RPC request. The method behaves in
* exactly the same manner as the {@link #call(int, XdrAble, CompletionHandler, long, TimeUnit)}
* method except that instead of specifying a completion handler, this method
* returns a CompletableFuture representing the pending result. The Future's get method
* returns the RPC reply responded by server.
*
* Convenience version of {@link #call(int, XdrAble, Class, RpcAuth)} with no auth
*
* @param <T> The result type of RPC call.
* @param procedure The number of the procedure.
* @param args The argument of the procedure.
* @param type The expected type of the reply
* @return A CompletableFuture representing the result of the operation.
* @throws OncRpcException
* @throws IOException
* @since 2.4.0
*/
public <T extends XdrAble> CompletableFuture<T> call(int procedure, XdrAble args, final Class<T> type)
throws IOException {
return call(procedure, args, type, null);
}
Expand Down Expand Up @@ -581,7 +601,7 @@ public void call(int procedure, XdrAble args, XdrAble result)
}
}

private <T extends XdrAble> Future<T> getCallFuture(int procedure, XdrAble args, final T result, long timeoutValue, TimeUnit timeoutUnits, RpcAuth auth)
private <T extends XdrAble> CompletableFuture<T> getCallFuture(int procedure, XdrAble args, final T result, long timeoutValue, TimeUnit timeoutUnits, RpcAuth auth)
throws IOException {

final CompletableFuture<T> future = new CompletableFuture<>();
Expand Down Expand Up @@ -609,7 +629,7 @@ public void failed(Throwable exc, RpcTransport attachment) {
return timeoutValue > 0 ? future : new TimeoutAwareFuture<>(future, xid);
}

private class TimeoutAwareFuture<T> implements Future<T> {
private class TimeoutAwareFuture<T> extends CompletableFuture<T> {
private final Future<T> delegate;
private final int xid;

Expand Down

0 comments on commit fe4cf32

Please sign in to comment.