-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for multiple CopyData messages #40
Conversation
I will add some additional tests to check row splitting between multiple CopyData messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to me to track all the different things going on, why they depend on each other, and if all the changes are adequately tested. I'm seeing a few things here
- Instead of parsing the CopyData payloads as they come in, put them all in a buffer and parse when we receive a CopyDone -- this looks reasonable to me and fits the goal of this PR
- Move parseCommand from IntermediateStatement to StatementParser -- this looks reasonable but should probably in another PR
- Error checking for COPY_IN in ControlMessage.java and some status handling in QueryMessage.java -- this should probably be in another PR
src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java
Outdated
Show resolved
Hide resolved
I broke off the other changes into separate PRs. |
src/main/java/com/google/cloud/spanner/pgadapter/statements/CopyStatement.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testMultipleCopyDataMessages() throws Exception {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that this is a placeholder for a test that is coming in a next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated now.
I think the current build errors might be caused by b/217744569 |
That assumption was wrong. It's caused by another issue. |
* @return True if current payload will fit within COMMIT_LIMIT. This is only an estimate and the | ||
* actual commit size may still be rejected by Spanner. | ||
*/ | ||
private boolean payloadFitsInCurrentBatch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name this commitSizeIsWithinLimit to match the naming of mutationCountIsWithinLimit? Also because the batching has been removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
rollback(connectionHandler, payload); | ||
long mutationCount = this.mutationCount + records.size() * records.get(0).size(); | ||
long commitSize = this.batchSize + payload.length; | ||
public void addMutations(ConnectionHandler connectionHandler, byte[] payload) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you name this addCopyData since we aren't doing anything with mutations yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} else { | ||
System.err.println("File " + unsuccessfulCopy.getName() + " already exists"); | ||
} | ||
this.fileWriter = new FileWriter(unsuccessfulCopy, false); | ||
} | ||
|
||
public void writeToErrorFile(byte[] payload) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to: writeCopyDataToErrorFile
add a comment that this is used if there are problems while collecting CopyData messages or building the mutation list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n"); | ||
} | ||
|
||
public void writeMutationsToErrorFile() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that this is used if Spanner returns an error when we try to commit the mutations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I'm thinking about this again and why are we writing the mutations to the file at all? The original intention of the error file was for the user to look through the copy data that they sent to PGAdapter, fix it, and try again. I think we should always be writing the payload data, not the mutation data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to write the copy data instead.
if (this.fileWriter == null) { | ||
createErrorFile(); | ||
} | ||
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n"); | ||
} | ||
|
||
/** | ||
* The list of mutaitons will be written to the error file if a problem was encountered while |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworded comment.
} | ||
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n"); | ||
} | ||
|
||
public void writeMutationsToErrorFile() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I'm thinking about this again and why are we writing the mutations to the file at all? The original intention of the error file was for the user to look through the copy data that they sent to PGAdapter, fix it, and try again. I think we should always be writing the payload data, not the mutation data
for (Mutation mutation : this.mutations) { | ||
this.fileWriter.write(mutation.toString()); | ||
} | ||
public void writeCopyDataToErrorFile() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think every time we write to the error file we write the full payload so you can just have a single version of writeCopyDataToErrorFile() without having a payload argument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
No description provided.