-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ReadAheadRemoteFileInputStream not reading the whole file if a buffer is too big #769
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -220,16 +220,42 @@ public int read(byte[] into, int off, int len) throws IOException { | |
|
||
public class ReadAheadRemoteFileInputStream | ||
extends InputStream { | ||
private class UnconfirmedRead { | ||
private final long offset; | ||
private final Promise<Response, SFTPException> promise; | ||
private final int length; | ||
|
||
private UnconfirmedRead(long offset, int length, Promise<Response, SFTPException> promise) { | ||
this.offset = offset; | ||
this.length = length; | ||
this.promise = promise; | ||
} | ||
|
||
UnconfirmedRead(long offset, int length) throws IOException { | ||
this(offset, length, RemoteFile.this.asyncRead(offset, length)); | ||
} | ||
|
||
public long getOffset() { | ||
return offset; | ||
} | ||
|
||
public Promise<Response, SFTPException> getPromise() { | ||
return promise; | ||
} | ||
|
||
public int getLength() { | ||
return length; | ||
} | ||
} | ||
|
||
private final byte[] b = new byte[1]; | ||
|
||
private final int maxUnconfirmedReads; | ||
private final long readAheadLimit; | ||
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>(); | ||
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>(); | ||
private final Queue<UnconfirmedRead> unconfirmedReads = new LinkedList<>(); | ||
|
||
private long requestOffset; | ||
private long responseOffset; | ||
private long currentOffset; | ||
private int maxReadLength = Integer.MAX_VALUE; | ||
private boolean eof; | ||
|
||
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) { | ||
|
@@ -247,28 +273,42 @@ public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset, | |
assert 0 <= fileOffset; | ||
|
||
this.maxUnconfirmedReads = maxUnconfirmedReads; | ||
this.requestOffset = this.responseOffset = fileOffset; | ||
this.currentOffset = fileOffset; | ||
this.readAheadLimit = readAheadLimit > 0 ? fileOffset + readAheadLimit : Long.MAX_VALUE; | ||
} | ||
|
||
private ByteArrayInputStream pending = new ByteArrayInputStream(new byte[0]); | ||
|
||
private boolean retrieveUnconfirmedRead(boolean blocking) throws IOException { | ||
if (unconfirmedReads.size() <= 0) { | ||
final UnconfirmedRead unconfirmedRead = unconfirmedReads.peek(); | ||
if (unconfirmedRead == null || !blocking && !unconfirmedRead.getPromise().isDelivered()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OperatorPrecedence: Use grouping parenthesis to make the operator precedence explicit (details) |
||
return false; | ||
} | ||
unconfirmedReads.remove(unconfirmedRead); | ||
|
||
if (!blocking && !unconfirmedReads.peek().isDelivered()) { | ||
return false; | ||
} | ||
|
||
unconfirmedReadOffsets.remove(); | ||
final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); | ||
final Response res = unconfirmedRead.promise.retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); | ||
switch (res.getType()) { | ||
case DATA: | ||
int recvLen = res.readUInt32AsInt(); | ||
responseOffset += recvLen; | ||
pending = new ByteArrayInputStream(res.array(), res.rpos(), recvLen); | ||
if (unconfirmedRead.offset == currentOffset) { | ||
currentOffset += recvLen; | ||
pending = new ByteArrayInputStream(res.array(), res.rpos(), recvLen); | ||
|
||
if (recvLen < unconfirmedRead.length) { | ||
// The server returned a packet smaller than the client had requested. | ||
// It can be caused by at least one of the following: | ||
// * The file has been read fully. Then, few futile read requests can be sent during | ||
// the next read(), but the file will be downloaded correctly anyway. | ||
// * The server shapes the request length. Then, the read window will be adjusted, | ||
// and all further read-ahead requests won't be shaped. | ||
// * The file on the server is not a regular file, it is something like fifo. | ||
// Then, the window will shrink, and the client will start reading the file slower than it | ||
// hypothetically can. It must be a rare case, and it is not worth implementing a sort of | ||
// congestion control algorithm here. | ||
maxReadLength = recvLen; | ||
unconfirmedReads.clear(); | ||
} | ||
} | ||
break; | ||
|
||
case STATUS: | ||
|
@@ -296,49 +336,24 @@ public int read(byte[] into, int off, int len) throws IOException { | |
// we also need to go here for len <= 0, because pending may be at | ||
// EOF in which case it would return -1 instead of 0 | ||
|
||
long requestOffset = currentOffset; | ||
while (unconfirmedReads.size() <= maxUnconfirmedReads) { | ||
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism | ||
int reqLen = Math.max(1024, len); // don't be shy! | ||
int reqLen = Math.min(Math.max(1024, len), maxReadLength); | ||
if (readAheadLimit > requestOffset) { | ||
long remaining = readAheadLimit - requestOffset; | ||
if (reqLen > remaining) { | ||
reqLen = (int) remaining; | ||
} | ||
} | ||
unconfirmedReads.add(RemoteFile.this.asyncRead(requestOffset, reqLen)); | ||
unconfirmedReadOffsets.add(requestOffset); | ||
unconfirmedReads.add(new UnconfirmedRead(requestOffset, reqLen)); | ||
requestOffset += reqLen; | ||
if (requestOffset >= readAheadLimit) { | ||
break; | ||
} | ||
} | ||
|
||
long nextOffset = unconfirmedReadOffsets.peek(); | ||
if (responseOffset != nextOffset) { | ||
|
||
// the server could not give us all the data we needed, so | ||
// we try to fill the gap synchronously | ||
|
||
assert responseOffset < nextOffset; | ||
assert 0 < (nextOffset - responseOffset); | ||
assert (nextOffset - responseOffset) <= Integer.MAX_VALUE; | ||
|
||
byte[] buf = new byte[(int) (nextOffset - responseOffset)]; | ||
int recvLen = RemoteFile.this.read(responseOffset, buf, 0, buf.length); | ||
|
||
if (recvLen < 0) { | ||
eof = true; | ||
return -1; | ||
} | ||
|
||
if (0 == recvLen) { | ||
// avoid infinite loops | ||
throw new SFTPException("Unexpected response size (0), bailing out"); | ||
} | ||
|
||
responseOffset += recvLen; | ||
pending = new ByteArrayInputStream(buf, 0, recvLen); | ||
} else if (!retrieveUnconfirmedRead(true /*blocking*/)) { | ||
if (!retrieveUnconfirmedRead(true /*blocking*/)) { | ||
|
||
// this may happen if we change prefetch strategy | ||
// currently, we should never get here... | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JdkObsolete: It is very rare for LinkedList to out-perform ArrayList or ArrayDeque. Avoid it unless you're willing to invest a lot of time into benchmarking. Caveat: LinkedList supports null elements, but ArrayDeque does not. (details)
(at-me in a reply with
help
orignore
)