Skip to content

Commit

Permalink
Added SFTP file transfer resume support on both PUT and GET. (#775)
Browse files Browse the repository at this point in the history
* Added SFTP file transfer resume support on both PUT and GET. Internally SFTPFileTransfer has a few sanity checks to fall back to full replacement even if the resume flag is set. 

SCP file transfers have not been changed to support this at this time.

* Added JUnit tests for issue-700

* Throw SCPException when attempting to resume SCP transfers.

* Licensing

* Small bug resuming a completed file was restarting since the bytes were equal.

* Enhanced test cases to validate the expected bytes transferred for each scenario are the actual bytes transferred.

* Removed author info which was pre-filled from company IDE template

* Added "fall through" comment for switch

* Changed the API for requesting a resume from a boolean flag with some internal decisions to be a user-specified long byte offset. This is cleaner but puts the onus on the caller to know exactly what they're asking for in their circumstance, which is ultimately better for a library like sshj.

* Reverted some now-unnecessary changes to SFTPFileTransfer.Uploader.prepareFile()

* Fix gradle exclude path for test files

Co-authored-by: Jeroen van Erp <jeroen@hierynomus.com>
  • Loading branch information
brenttyler and hierynomus authored May 27, 2022
1 parent d7e402c commit 3de0302
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 27 deletions.
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ license {
mapping {
java = 'SLASHSTAR_STYLE'
}
excludes(['**/djb/Curve25519.java', '**/sshj/common/Base64.java', '**/com/hierynomus/sshj/userauth/keyprovider/bcrypt/*.java'])
excludes([
'**/djb/Curve25519.java',
'**/sshj/common/Base64.java',
'**/com/hierynomus/sshj/userauth/keyprovider/bcrypt/*.java',
'**/files/test_file_*.txt',
])
}

if (!JavaVersion.current().isJava9Compatible()) {
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/net/schmizz/sshj/sftp/RemoteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,16 @@ public int getLength() {
private boolean eof;

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
this(maxUnconfirmedReads, 0L, -1L);
this(maxUnconfirmedReads, 0L);
}

/**
*
* @param maxUnconfirmedReads Maximum number of unconfirmed requests to send
* @param fileOffset Initial offset in file to read from
*/
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
this(maxUnconfirmedReads, fileOffset, -1L);
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/net/schmizz/sshj/sftp/SFTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,41 @@ public void get(String source, String dest)
throws IOException {
xfer.download(source, dest);
}

public void get(String source, String dest, long byteOffset)
throws IOException {
xfer.download(source, dest, byteOffset);
}

public void put(String source, String dest)
throws IOException {
xfer.upload(source, dest);
}

public void put(String source, String dest, long byteOffset)
throws IOException {
xfer.upload(source, dest, byteOffset);
}

public void get(String source, LocalDestFile dest)
throws IOException {
xfer.download(source, dest);
}

public void get(String source, LocalDestFile dest, long byteOffset)
throws IOException {
xfer.download(source, dest, byteOffset);
}

public void put(LocalSourceFile source, String dest)
throws IOException {
xfer.upload(source, dest);
}

public void put(LocalSourceFile source, String dest, long byteOffset)
throws IOException {
xfer.upload(source, dest, byteOffset);
}

@Override
public void close()
Expand Down
83 changes: 61 additions & 22 deletions src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,47 @@ public void setPreserveAttributes(boolean preserveAttributes) {
@Override
public void upload(String source, String dest)
throws IOException {
upload(new FileSystemFile(source), dest);
upload(source, dest, 0);
}

@Override
public void upload(String source, String dest, long byteOffset)
throws IOException {
upload(new FileSystemFile(source), dest, byteOffset);
}

@Override
public void download(String source, String dest)
throws IOException {
download(source, new FileSystemFile(dest));
download(source, dest, 0);
}

@Override
public void download(String source, String dest, long byteOffset)
throws IOException {
download(source, new FileSystemFile(dest), byteOffset);
}

@Override
public void upload(LocalSourceFile localFile, String remotePath) throws IOException {
new Uploader(localFile, remotePath).upload(getTransferListener());
upload(localFile, remotePath, 0);
}

@Override
public void upload(LocalSourceFile localFile, String remotePath, long byteOffset) throws IOException {
new Uploader(localFile, remotePath).upload(getTransferListener(), byteOffset);
}

@Override
public void download(String source, LocalDestFile dest) throws IOException {
download(source, dest, 0);
}

@Override
public void download(String source, LocalDestFile dest, long byteOffset) throws IOException {
final PathComponents pathComponents = engine.getPathHelper().getComponents(source);
final FileAttributes attributes = engine.stat(source);
new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest);
new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest, byteOffset);
}

public void setUploadFilter(LocalFileFilter uploadFilter) {
Expand All @@ -92,7 +114,8 @@ private class Downloader {
@SuppressWarnings("PMD.MissingBreakInSwitch")
private void download(final TransferListener listener,
final RemoteResourceInfo remote,
final LocalDestFile local) throws IOException {
final LocalDestFile local,
final long byteOffset) throws IOException {
final LocalDestFile adjustedFile;
switch (remote.getAttributes().getType()) {
case DIRECTORY:
Expand All @@ -101,8 +124,9 @@ private void download(final TransferListener listener,
case UNKNOWN:
log.warn("Server did not supply information about the type of file at `{}` " +
"-- assuming it is a regular file!", remote.getPath());
// fall through
case REGULAR:
adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()), remote, local);
adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()), remote, local, byteOffset);
break;
default:
throw new IOException(remote + " is not a regular file or directory");
Expand All @@ -119,7 +143,7 @@ private LocalDestFile downloadDir(final TransferListener listener,
final RemoteDirectory rd = engine.openDir(remote.getPath());
try {
for (RemoteResourceInfo rri : rd.scan(getDownloadFilter()))
download(listener, rri, adjusted.getChild(rri.getName()));
download(listener, rri, adjusted.getChild(rri.getName()), 0); // not supporting individual byte offsets for these files
} finally {
rd.close();
}
Expand All @@ -128,13 +152,15 @@ private LocalDestFile downloadDir(final TransferListener listener,

private LocalDestFile downloadFile(final StreamCopier.Listener listener,
final RemoteResourceInfo remote,
final LocalDestFile local)
final LocalDestFile local,
final long byteOffset)
throws IOException {
final LocalDestFile adjusted = local.getTargetFile(remote.getName());
final RemoteFile rf = engine.open(remote.getPath());
try {
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
final OutputStream os = adjusted.getOutputStream();
log.debug("Attempting to download {} with offset={}", remote.getPath(), byteOffset);
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16, byteOffset);
final OutputStream os = adjusted.getOutputStream(byteOffset != 0);
try {
new StreamCopier(rfis, os, engine.getLoggerFactory())
.bufSize(engine.getSubsystem().getLocalMaxPacketSize())
Expand Down Expand Up @@ -173,17 +199,17 @@ private Uploader(final LocalSourceFile source, final String remote) {
this.remote = remote;
}

private void upload(final TransferListener listener) throws IOException {
private void upload(final TransferListener listener, long byteOffset) throws IOException {
if (source.isDirectory()) {
makeDirIfNotExists(remote); // Ensure that the directory exists
uploadDir(listener.directory(source.getName()), source, remote);
setAttributes(source, remote);
} else if (source.isFile() && isDirectory(remote)) {
String adjustedRemote = engine.getPathHelper().adjustForParent(this.remote, source.getName());
uploadFile(listener.file(source.getName(), source.getLength()), source, adjustedRemote);
uploadFile(listener.file(source.getName(), source.getLength()), source, adjustedRemote, byteOffset);
setAttributes(source, adjustedRemote);
} else if (source.isFile()) {
uploadFile(listener.file(source.getName(), source.getLength()), source, remote);
uploadFile(listener.file(source.getName(), source.getLength()), source, remote, byteOffset);
setAttributes(source, remote);
} else {
throw new IOException(source + " is not a file or directory");
Expand All @@ -192,13 +218,14 @@ private void upload(final TransferListener listener) throws IOException {

private void upload(final TransferListener listener,
final LocalSourceFile local,
final String remote)
final String remote,
final long byteOffset)
throws IOException {
final String adjustedPath;
if (local.isDirectory()) {
adjustedPath = uploadDir(listener.directory(local.getName()), local, remote);
} else if (local.isFile()) {
adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote);
adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote, byteOffset);
} else {
throw new IOException(local + " is not a file or directory");
}
Expand All @@ -217,22 +244,34 @@ private String uploadDir(final TransferListener listener,
throws IOException {
makeDirIfNotExists(remote);
for (LocalSourceFile f : local.getChildren(getUploadFilter()))
upload(listener, f, engine.getPathHelper().adjustForParent(remote, f.getName()));
upload(listener, f, engine.getPathHelper().adjustForParent(remote, f.getName()), 0); // not supporting individual byte offsets for these files
return remote;
}

private String uploadFile(final StreamCopier.Listener listener,
final LocalSourceFile local,
final String remote)
final String remote,
final long byteOffset)
throws IOException {
final String adjusted = prepareFile(local, remote);
final String adjusted = prepareFile(local, remote, byteOffset);
RemoteFile rf = null;
InputStream fis = null;
RemoteFile.RemoteFileOutputStream rfos = null;
EnumSet<OpenMode> modes;
try {
rf = engine.open(adjusted, EnumSet.of(OpenMode.WRITE, OpenMode.CREAT, OpenMode.TRUNC));
if (byteOffset == 0) {
// Starting at the beginning, overwrite/create
modes = EnumSet.of(OpenMode.WRITE, OpenMode.CREAT, OpenMode.TRUNC);
} else {
// Starting at some offset, append
modes = EnumSet.of(OpenMode.WRITE, OpenMode.APPEND);
}

log.debug("Attempting to upload {} with offset={}", local.getName(), byteOffset);
rf = engine.open(adjusted, modes);
fis = local.getInputStream();
rfos = rf.new RemoteFileOutputStream(0, 16);
fis.skip(byteOffset);
rfos = rf.new RemoteFileOutputStream(byteOffset, 16);
new StreamCopier(fis, rfos, engine.getLoggerFactory())
.bufSize(engine.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead())
.keepFlushing(false)
Expand Down Expand Up @@ -294,7 +333,7 @@ private boolean isDirectory(final String remote) throws IOException {
}
}

private String prepareFile(final LocalSourceFile local, final String remote)
private String prepareFile(final LocalSourceFile local, final String remote, final long byteOffset)
throws IOException {
final FileAttributes attrs;
try {
Expand All @@ -309,7 +348,7 @@ private String prepareFile(final LocalSourceFile local, final String remote)
if (attrs.getMode().getType() == FileMode.Type.DIRECTORY) {
throw new IOException("Trying to upload file " + local.getName() + " to path " + remote + " but that is a directory");
} else {
log.debug("probeFile: {} is a {} file that will be replaced", remote, attrs.getMode().getType());
log.debug("probeFile: {} is a {} file that will be {}", remote, attrs.getMode().getType(), byteOffset > 0 ? "resumed" : "replaced");
return remote;
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/net/schmizz/sshj/xfer/FileSystemFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ public InputStream getInputStream()
@Override
public OutputStream getOutputStream()
throws IOException {
return new FileOutputStream(file);
return getOutputStream(false);
}

@Override
public OutputStream getOutputStream(boolean append)
throws IOException {
return new FileOutputStream(file, append);
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/net/schmizz/sshj/xfer/FileTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ public interface FileTransfer {
void upload(String localPath, String remotePath)
throws IOException;

/**
* This is meant to delegate to {@link #upload(LocalSourceFile, String)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}. Appends to existing if {@code byteOffset} &gt; 0.
*
* @param localPath
* @param remotePath
* @param byteOffset
*
* @throws IOException
*/
void upload(String localPath, String remotePath, long byteOffset)
throws IOException;

/**
* This is meant to delegate to {@link #download(String, LocalDestFile)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}.
Expand All @@ -43,6 +56,19 @@ void upload(String localPath, String remotePath)
void download(String remotePath, String localPath)
throws IOException;

/**
* This is meant to delegate to {@link #download(String, LocalDestFile)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}. Appends to existing if {@code byteOffset} &gt; 0.
*
* @param localPath
* @param remotePath
* @param byteOffset
*
* @throws IOException
*/
void download(String remotePath, String localPath, long byteOffset)
throws IOException;

/**
* Upload {@code localFile} to {@code remotePath}.
*
Expand All @@ -54,6 +80,18 @@ void download(String remotePath, String localPath)
void upload(LocalSourceFile localFile, String remotePath)
throws IOException;

/**
* Upload {@code localFile} to {@code remotePath}. Appends to existing if {@code byteOffset} &gt; 0.
*
* @param localFile
* @param remotePath
* @param byteOffset
*
* @throws IOException
*/
void upload(LocalSourceFile localFile, String remotePath, long byteOffset)
throws IOException;

/**
* Download {@code remotePath} to {@code localFile}.
*
Expand All @@ -65,6 +103,18 @@ void upload(LocalSourceFile localFile, String remotePath)
void download(String remotePath, LocalDestFile localFile)
throws IOException;

/**
* Download {@code remotePath} to {@code localFile}. Appends to existing if {@code byteOffset} &gt; 0.
*
* @param localFile
* @param remotePath
* @param byteOffset
*
* @throws IOException
*/
void download(String remotePath, LocalDestFile localFile, long byteOffset)
throws IOException;

TransferListener getTransferListener();

void setTransferListener(TransferListener listener);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/net/schmizz/sshj/xfer/LocalDestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

public interface LocalDestFile {

long getLength();

OutputStream getOutputStream()
throws IOException;

OutputStream getOutputStream(boolean append)
throws IOException;

/** @return A child file/directory of this directory with given {@code name}. */
LocalDestFile getChild(String name);
Expand Down
Loading

0 comments on commit 3de0302

Please sign in to comment.