Skip to content

Commit

Permalink
nfsv42: update OperationCOPY to support async copy
Browse files Browse the repository at this point in the history
Motivation:
For a big files client might request an async copy. Moreover, the server
itself might decide to switch to async mode if requests takes too long.
Thus the server should be ready to notify the client when copy is complete.

Modification:
Update OperationCOPY to (a) use async copy if requests or switch to
async mode if copyFileRange syscall takes too long.

Result:
async behavior for server-side-copy

Acked-by: Paul Millar
Target: master
  • Loading branch information
kofemann committed Jan 17, 2022
1 parent 20968d2 commit 5541044
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 31 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/dcache/nfs/v4/ClientCB.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,15 @@ public void cbDeleteDevice(deviceid4 id) throws OncRpcException, IOException {
}
}

public void cbOffload(nfs_fh4 fh, stateid4 stateid, write_response4 response) throws OncRpcException, IOException {
public void cbOffload(nfs_fh4 fh, stateid4 stateid, write_response4 response, int status) throws OncRpcException, IOException {

CB_OFFLOAD4args copyOffload = new CB_OFFLOAD4args();

copyOffload.coa_fh = fh;
copyOffload.coa_stateid = stateid;
copyOffload.coa_offload_info = new offload_info4();
copyOffload.coa_offload_info.coa_resok4 = response;
copyOffload.coa_offload_info.coa_status = nfsstat.NFS_OK;
copyOffload.coa_offload_info.coa_status = status;

nfs_cb_argop4 opArgs = new nfs_cb_argop4();
opArgs.argop = nfs_cb_opnum4.OP_CB_OFFLOAD;
Expand Down
108 changes: 79 additions & 29 deletions core/src/main/java/org/dcache/nfs/v4/OperationCOPY.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Deutsches Elektronen-Synchroton,
* Copyright (c) 2021 - 2022 Deutsches Elektronen-Synchroton,
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
*
* This library is free software; you can redistribute it and/or modify
Expand All @@ -20,11 +20,13 @@
package org.dcache.nfs.v4;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.base.Throwables;
import org.dcache.nfs.ChimeraNFSException;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.status.NotSuppException;
Expand All @@ -35,10 +37,12 @@
import org.dcache.nfs.v4.xdr.length4;
import org.dcache.nfs.v4.xdr.nfs4_prot;
import org.dcache.nfs.v4.xdr.nfs_argop4;
import org.dcache.nfs.v4.xdr.nfs_fh4;
import org.dcache.nfs.v4.xdr.nfs_opnum4;
import org.dcache.nfs.v4.xdr.nfs_resop4;
import org.dcache.nfs.v4.xdr.stable_how4;
import org.dcache.nfs.v4.xdr.stateid4;
import org.dcache.nfs.v4.xdr.verifier4;
import org.dcache.nfs.v4.xdr.write_response4;
import org.dcache.nfs.vfs.Inode;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,9 +70,13 @@ public void process(CompoundContext context, nfs_resop4 result) throws ChimeraNF
throw new NotSuppException("Inter-server copy is not supported");
}

// only synchronous mode is supported (for now)
if (!_args.opcopy.ca_consecutive || !_args.opcopy.ca_synchronous) {
throw new NotSuppException();
// only consecutive copy is supported
if (!_args.opcopy.ca_consecutive) {
res.cr_requirements = new copy_requirements4();
res.cr_requirements.cr_consecutive = true;
res.cr_requirements.cr_synchronous = _args.opcopy.ca_synchronous;
res.cr_status = nfsstat.NFS4ERR_OFFLOAD_NO_REQS;
return;
}

Inode srcInode = context.savedInode();
Expand All @@ -91,31 +99,73 @@ public void process(CompoundContext context, nfs_resop4 result) throws ChimeraNF
throw new OpenModeException("Invalid destination inode open mode (required write)");
}

Future<Long> copyFuture = context.getFs().copyFileRange(srcInode, srcPos, dstInode, dstPos, len);

try {
long n = Uninterruptibles.getUninterruptibly(copyFuture);

res.cr_resok4 = new COPY4resok();
res.cr_resok4.cr_response = new write_response4();
res.cr_resok4.cr_response.wr_callback_id = new stateid4[0];
res.cr_resok4.cr_response.wr_committed = stable_how4.FILE_SYNC4;
res.cr_resok4.cr_response.wr_count = new length4(n);
res.cr_resok4.cr_response.wr_writeverf = context.getRebootVerifier();

res.cr_resok4.cr_requirements = new copy_requirements4();
res.cr_resok4.cr_requirements.cr_consecutive = true;
res.cr_resok4.cr_requirements.cr_synchronous = true;

res.cr_status = nfsstat.NFS_OK;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ChimeraNFSException) {
res.cr_status = ((ChimeraNFSException) cause).getStatus();
} else {
LOGGER.warn("Unexpected error during copyFileRange", e);
res.cr_resok4 = new COPY4resok();
res.cr_resok4.cr_response = new write_response4();
res.cr_resok4.cr_response.wr_writeverf = context.getRebootVerifier();
res.cr_resok4.cr_response.wr_callback_id = new stateid4[]{};
res.cr_resok4.cr_response.wr_committed = stable_how4.FILE_SYNC4;
res.cr_resok4.cr_response.wr_count = new length4(0);

res.cr_resok4.cr_requirements = new copy_requirements4();
res.cr_resok4.cr_requirements.cr_consecutive = true;
res.cr_status = nfsstat.NFS_OK;

CompletableFuture<Long> copyFuture = context.getFs().copyFileRange(srcInode, srcPos, dstInode, dstPos, len);
boolean isSync = _args.opcopy.ca_synchronous;
if (isSync) {
try {
// try sync copy and fall-back to async
long n = copyFuture.get(1, TimeUnit.SECONDS);
res.cr_resok4.cr_response.wr_count = new length4(n);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Copy-offload interrupted: ", Throwables.getRootCause(e));
res.cr_status = nfsstat.NFSERR_IO;
} catch (TimeoutException e) {
// continue as async copy
isSync = false;
}
}

if (!isSync) {
// copy asynchronously
var copyState = notifyWhenComplete(client, dstInode, context.getRebootVerifier(), copyFuture);
res.cr_resok4.cr_response.wr_callback_id = new stateid4[]{copyState};
}
res.cr_resok4.cr_requirements.cr_synchronous = isSync;
}

private stateid4 notifyWhenComplete(NFS4Client client, Inode dstInode, verifier4 verifier, CompletableFuture<Long> copyFuture) throws ChimeraNFSException {
var openState = client.state(_args.opcopy.ca_src_stateid);
var copyState = client.createState(openState.getStateOwner(), openState).stateid();

copyFuture.handle((n, t) -> {

var cr_response = new write_response4();
cr_response.wr_callback_id = new stateid4[]{};
cr_response.wr_committed = stable_how4.FILE_SYNC4;
cr_response.wr_count = new length4(n);
cr_response.wr_writeverf = verifier;

try {
client.getCB().cbOffload(new nfs_fh4(dstInode.toNfsHandle()), copyState, cr_response, toNfsState(t));
} catch (IOException ex) {
LOGGER.warn("Failed to notify client about copy-offload completion: {}", ex.getMessage());
}

return null;
});

return copyState;
}

private int toNfsState(Throwable t) {

if (t == null) {
return nfsstat.NFS_OK;
}

// FIXME: we need some mapping between 'well known' exceptions and nfs error states.
LOGGER.warn("Copy-offload failed with exception: {}", Throwables.getRootCause(t).toString());
return nfsstat.NFSERR_IO;
}
}

0 comments on commit 5541044

Please sign in to comment.