-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use per-JVM lock in S3DynamoDBLogStore to reduce number of N.temp.json -> N.json
copies
#1711
Conversation
if (overwrite) { | ||
writeActions(fs, path, actions); | ||
return; | ||
} else if (fs.exists(path)) { | ||
// Step 0: Fail if N.json already exists in FileSystem and overwrite=false. | ||
throw new java.nio.file.FileAlreadyExistsException(path.toString()); | ||
} | ||
|
||
// Step 1: Ensure that N-1.json exists | ||
final Path tablePath = getTablePath(resolvedPath); | ||
if (FileNameUtils.isDeltaFile(path)) { | ||
final long version = FileNameUtils.deltaVersion(path); | ||
if (version > 0) { | ||
final long prevVersion = version - 1; | ||
final Path deltaLogPath = new Path(tablePath, "_delta_log"); | ||
final Path prevPath = FileNameUtils.deltaFile(deltaLogPath, prevVersion); | ||
final String prevFileName = prevPath.getName(); | ||
final Optional<ExternalCommitEntry> prevEntry = getExternalEntry( | ||
tablePath.toString(), | ||
prevFileName | ||
); | ||
if (prevEntry.isPresent() && !prevEntry.get().complete) { | ||
fixDeltaLog(fs, prevEntry.get()); | ||
} else { | ||
if (!fs.exists(prevPath)) { | ||
throw new java.nio.file.FileSystemException( | ||
String.format("previous commit %s doesn't exist", prevPath) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just indentation change
try { | ||
// Step 3: COMMIT the commit to the delta log. | ||
// Copy T(N) -> N.json with overwrite=false | ||
writeCopyTempFile(fs, entry.absoluteTempPath(), resolvedPath); | ||
|
||
// Step 4: ACKNOWLEDGE the commit | ||
writePutCompleteDbEntry(entry); | ||
} catch (Throwable e) { | ||
LOG.info( | ||
"{}: ignoring recoverable error", e.getClass().getSimpleName(), e | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation change
int retry = 0; | ||
boolean copied = false; | ||
while (true) { | ||
LOG.debug("trying to fix: {}", entry.fileName); | ||
try { | ||
if (!copied && !fs.exists(entry.absoluteFilePath())) { | ||
fixDeltaLogCopyTempFile(fs, entry.absoluteTempPath(), entry.absoluteFilePath()); | ||
copied = true; | ||
} | ||
fixDeltaLogPutCompleteDbEntry(entry); | ||
LOG.info("fixed {}", entry.fileName); | ||
return; | ||
} catch(Throwable e) { | ||
LOG.info("{}:", e.getClass().getSimpleName(), e); | ||
if (retry >= 3) { | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation change + put within try
block
Can you rename T(N) to something N.temp.json? its not obvious without prior context what this T(N) is. |
T(N) -> N.json
copiesN.temp.json -> N.json
copies
// | ||
// Also note that this lock path (resolvedPath) is for N.json, while the lock path used | ||
// below in the recovery `fixDeltaLog` path is for N-1.json. Thus, no deadlock. | ||
pathLock.acquire(resolvedPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remind me why we need to lock the write path? Currently, PathLock
doesn't support nested locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to lock on the write path since, during the write path, we will copy T(N) --> N.json
.
As soon as we write T(N)
and commit E(N, T(N), complete=false)
a reader could come, see the entry E, and then copy T(N) --> N.json
.
By having the lock on the write path, we prevent readers from duplicating the copy into N.json.
What do you mean by "nesting"? It is per-path? So we can have a lock on N.json (writer), and then see that N-1.json doesn't exist, so try and grab a lock for N-1.json too to perform the recovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we prevent readers from duplicating the copy into N.json.
Is it still possible after we lock fixDeltaLog
?
What do you mean by "nesting"?
The following pattern doesn't work when using the same path.
pathLock.acquire(resolvedPath);
try {
pathLock.acquire(resolvedPath);
try {
} finally {
pathLock.release(resolvedPath);
}
} finally {
pathLock.release(resolvedPath);
}
But I see your point: we lock different paths.
A high-level question: can we just lock BaseExternalLogStore.copyFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still possible after we lock fixDeltaLog?
Yes. Imagine we have two writers A and B. Imagine A grabs the lock, and B does not grab the lock. Then even though A has the lock, B can still write into the same file as A.
Thus, every writer must grab the lock!
Nesting
Ah yes I see what you mean. But yes we lock on different paths
Description
Adds a global (per JVM) path lock to S3DynamoDBLogStore to reduce the number of
T(N) -> N.json
copies, which can occur when there are concurrent readers/writers.Note: multiple
T(N) -> N.json
copies will not cause data loss, but it may impact readers who already have an existing InputStream open on that particular file.How was this patch tested?
It's really hard to test this specific concurrency issue. Code review + existing tests.
Does this PR introduce any user-facing changes?
No.