Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to limit read ahead to maximum length. Allows to use mu… #724

Merged
merged 2 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/main/java/net/schmizz/sshj/sftp/RemoteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public class ReadAheadRemoteFileInputStream
private final byte[] b = new byte[1];

private final int maxUnconfirmedReads;
private final long readAheadLimit;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>();

Expand All @@ -232,17 +233,22 @@ public class ReadAheadRemoteFileInputStream
private boolean eof;

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
assert 0 <= maxUnconfirmedReads;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this(maxUnconfirmedReads, 0L, -1L);
}

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
/**
*
* @param maxUnconfirmedReads Maximum number of unconfirmed requests to send
* @param fileOffset Initial offset in file to read from
* @param readAheadLimit Read ahead is disabled after this limit has been reached
*/
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset, long readAheadLimit) {
assert 0 <= maxUnconfirmedReads;
assert 0 <= fileOffset;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this.requestOffset = this.responseOffset = fileOffset;
this.readAheadLimit = readAheadLimit > 0 ? fileOffset + readAheadLimit : Long.MAX_VALUE;
}

private ByteArrayInputStream pending = new ByteArrayInputStream(new byte[0]);
Expand Down Expand Up @@ -293,9 +299,18 @@ public int read(byte[] into, int off, int len) throws IOException {
while (unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
int reqLen = Math.max(1024, len); // don't be shy!
if (readAheadLimit > requestOffset) {
long remaining = readAheadLimit - requestOffset;
if (reqLen > remaining) {
reqLen = (int) remaining;
}
}
unconfirmedReads.add(RemoteFile.this.asyncRead(requestOffset, reqLen));
unconfirmedReadOffsets.add(requestOffset);
requestOffset += reqLen;
if (requestOffset >= readAheadLimit) {
break;
}
}

long nextOffset = unconfirmedReadOffsets.peek();
Expand Down
90 changes: 90 additions & 0 deletions src/test/java/com/hierynomus/sshj/sftp/RemoteFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.schmizz.sshj.sftp.OpenMode;
import net.schmizz.sshj.sftp.RemoteFile;
import net.schmizz.sshj.sftp.SFTPEngine;
import net.schmizz.sshj.sftp.SFTPException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -32,6 +33,7 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;

public class RemoteFileTest {
@Rule
Expand Down Expand Up @@ -84,4 +86,92 @@ public void shouldNotGoOutOfBoundsInReadAheadInputStream() throws IOException {

assertThat("The written and received data should match", data, equalTo(test2));
}

@Test
public void shouldNotReadAheadAfterLimitInputStream() throws IOException {
SSHClient ssh = fixture.setupConnectedDefaultClient();
ssh.authPassword("test", "test");
SFTPEngine sftp = new SFTPEngine(ssh).init();

RemoteFile rf;
File file = temp.newFile("SftpReadAheadLimitTest.bin");
rf = sftp.open(file.getPath(), EnumSet.of(OpenMode.WRITE, OpenMode.CREAT));
byte[] data = new byte[8192];
new Random(53).nextBytes(data);
data[3072] = 1;
rf.write(0, data, 0, data.length);
rf.close();

assertThat("The file should exist", file.exists());

rf = sftp.open(file.getPath());
InputStream rs = rf.new ReadAheadRemoteFileInputStream(16 /*maxUnconfirmedReads*/,0, 3072);

byte[] test = new byte[4097];
int n = 0;

while (n < 2048) {
n += rs.read(test, n, 2048 - n);
}

rf.close();

while (n < 3072) {
n += rs.read(test, n, 3072 - n);
}

assertThat("buffer overrun", test[3072] == 0);

try {
rs.read(test, n, test.length - n);
fail("Content must not be buffered");
} catch (SFTPException e){
// expected
}
}

@Test
public void limitedReadAheadInputStream() throws IOException {
SSHClient ssh = fixture.setupConnectedDefaultClient();
ssh.authPassword("test", "test");
SFTPEngine sftp = new SFTPEngine(ssh).init();

RemoteFile rf;
File file = temp.newFile("SftpReadAheadLimitedTest.bin");
rf = sftp.open(file.getPath(), EnumSet.of(OpenMode.WRITE, OpenMode.CREAT));
byte[] data = new byte[8192];
new Random(53).nextBytes(data);
data[3072] = 1;
rf.write(0, data, 0, data.length);
rf.close();

assertThat("The file should exist", file.exists());

rf = sftp.open(file.getPath());
InputStream rs = rf.new ReadAheadRemoteFileInputStream(16 /*maxUnconfirmedReads*/,0, 3072);

byte[] test = new byte[4097];
int n = 0;

while (n < 2048) {
n += rs.read(test, n, 2048 - n);
}

while (n < 3072) {
n += rs.read(test, n, 3072 - n);
}

assertThat("buffer overrun", test[3072] == 0);

n += rs.read(test, n, test.length - n); // --> ArrayIndexOutOfBoundsException

byte[] test2 = new byte[data.length];
System.arraycopy(test, 0, test2, 0, test.length);

while (n < data.length) {
n += rs.read(test2, n, data.length - n);
}

assertThat("The written and received data should match", data, equalTo(test2));
}
}