Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: exception on concurrent download of ParseFile from multiple threads #1180

Merged
merged 6 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 77 additions & 58 deletions parse/src/main/java/com/parse/ParseFileController.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.parse.http.ParseHttpRequest;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.json.JSONObject;

Expand All @@ -21,6 +23,7 @@ class ParseFileController {
private final Object lock = new Object();
private final ParseHttpClient restClient;
private final File cachePath;
private final List<String> currentlyDownloadedFilesNames = new ArrayList<>();

private ParseHttpClient fileClient;

Expand Down Expand Up @@ -168,64 +171,80 @@ public Task<File> fetchAsync(
if (cancellationToken != null && cancellationToken.isCancelled()) {
return Task.cancelled();
}
final File cacheFile = getCacheFile(state);
return Task.call(cacheFile::exists, ParseExecutors.io())
.continueWithTask(
task -> {
boolean result = task.getResult();
if (result) {
return Task.forResult(cacheFile);
}
if (cancellationToken != null && cancellationToken.isCancelled()) {
return Task.cancelled();
return Task.call(
() -> {
final File cacheFile = getCacheFile(state);

synchronized (lock) {
if (currentlyDownloadedFilesNames.contains(state.name())) {
while (currentlyDownloadedFilesNames.contains(state.name())) {
lock.wait();
}

// Generate the temp file path for caching ParseFile content based on
// ParseFile's url
// The reason we do not write to the cacheFile directly is because there
// is no way we can
// verify if a cacheFile is complete or not. If download is interrupted
// in the middle, next
// time when we download the ParseFile, since cacheFile has already
// existed, we will return
// this incomplete cacheFile
final File tempFile = getTempFile(state);

// network
final ParseFileRequest request =
new ParseFileRequest(
ParseHttpRequest.Method.GET, state.url(), tempFile);

// We do not need to delete the temp file since we always try to
// overwrite it
return request.executeAsync(
fileClient(),
null,
downloadProgressCallback,
cancellationToken)
.continueWithTask(
task1 -> {
// If the top-level task was cancelled, don't
// actually set the data -- just move on.
if (cancellationToken != null
&& cancellationToken.isCancelled()) {
throw new CancellationException();
}
if (task1.isFaulted()) {
ParseFileUtils.deleteQuietly(tempFile);
return task1.cast();
}

// Since we give the cacheFile pointer to
// developers, it is not safe to guarantee
// cacheFile always does not exist here, so it is
// better to delete it manually,
// otherwise moveFile may throw an exception.
ParseFileUtils.deleteQuietly(cacheFile);
ParseFileUtils.moveFile(tempFile, cacheFile);
return Task.forResult(cacheFile);
},
ParseExecutors.io());
});
}

if (cacheFile.exists()) {
return cacheFile;
} else {
currentlyDownloadedFilesNames.add(state.name());
}
}

try {
if (cancellationToken != null && cancellationToken.isCancelled()) {
throw new CancellationException();
}

// Generate the temp file path for caching ParseFile content based on
// ParseFile's url
// The reason we do not write to the cacheFile directly is because there
// is no way we can
// verify if a cacheFile is complete or not. If download is interrupted
// in the middle, next
// time when we download the ParseFile, since cacheFile has already
// existed, we will return
// this incomplete cacheFile
final File tempFile = getTempFile(state);

// network
final ParseFileRequest request =
new ParseFileRequest(
ParseHttpRequest.Method.GET, state.url(), tempFile);

// We do not need to delete the temp file since we always try to
// overwrite it
Task<Void> downloadTask =
request.executeAsync(
fileClient(),
null,
downloadProgressCallback,
cancellationToken);
ParseTaskUtils.wait(downloadTask);

// If the top-level task was cancelled, don't
// actually set the data -- just move on.
if (cancellationToken != null && cancellationToken.isCancelled()) {
throw new CancellationException();
}
if (downloadTask.isFaulted()) {
ParseFileUtils.deleteQuietly(tempFile);
throw new RuntimeException(downloadTask.getError());
}

// Since we give the cacheFile pointer to
// developers, it is not safe to guarantee
// cacheFile always does not exist here, so it is
// better to delete it manually,
// otherwise moveFile may throw an exception.
ParseFileUtils.deleteQuietly(cacheFile);
ParseFileUtils.moveFile(tempFile, cacheFile);
return cacheFile;
} finally {
synchronized (lock) {
currentlyDownloadedFilesNames.remove(state.name());
lock.notifyAll();
}
}
},
ParseExecutors.io());
}
}
64 changes: 64 additions & 0 deletions parse/src/test/java/com/parse/ParseFileControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -341,5 +343,67 @@ public void testFetchAsyncFailure() throws Exception {
assertFalse(controller.getTempFile(state).exists());
}

@Test
public void testFetchAsyncConcurrentCallsSuccess() throws Exception {
byte[] data = "hello".getBytes();
ParseHttpResponse mockResponse =
new ParseHttpResponse.Builder()
.setStatusCode(200)
.setTotalSize((long) data.length)
.setContent(new ByteArrayInputStream(data))
.build();

ParseHttpClient fileClient = mock(ParseHttpClient.class);
when(fileClient.execute(any(ParseHttpRequest.class))).thenReturn(mockResponse);
// Make sure cache dir does not exist
File root = new File(temporaryFolder.getRoot(), "cache");
assertFalse(root.exists());
ParseFileController controller = new ParseFileController(null, root).fileClient(fileClient);
ParseFile.State state = new ParseFile.State.Builder().name("file_name").url("url").build();
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicReference<File> file1Ref = new AtomicReference<>();
AtomicReference<File> file2Ref = new AtomicReference<>();

new Thread(
() -> {
try {
file1Ref.set(
ParseTaskUtils.wait(
controller.fetchAsync(state, null, null, null)));
} catch (ParseException e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
})
.start();

new Thread(
() -> {
try {
file2Ref.set(
ParseTaskUtils.wait(
controller.fetchAsync(state, null, null, null)));
} catch (ParseException e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
})
.start();

countDownLatch.await();

File result1 = file1Ref.get();
File result2 = file2Ref.get();

assertTrue(result1.exists());
assertEquals("hello", ParseFileUtils.readFileToString(result1, "UTF-8"));
assertTrue(result2.exists());
assertEquals("hello", ParseFileUtils.readFileToString(result2, "UTF-8"));
verify(fileClient, times(1)).execute(any(ParseHttpRequest.class));
assertFalse(controller.getTempFile(state).exists());
}

// endregion
}