Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for multiple CopyData messages #40

Merged
merged 20 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public MutationWriter getMutationWriter() {
return this.mutationWriter;
}

/** @return 0 for text/csv formatting and 1 for binary */
public int getFormatCode() {
return (options.getFormat() == CopyTreeParser.CopyOptions.Format.BINARY) ? 1 : 0;
}

private void verifyCopyColumns() throws SQLException {
if (options.getColumnNames().size() == 0) {
// Use all columns if none were specified.
Expand Down Expand Up @@ -168,7 +173,7 @@ private void queryInformationSchema() throws SQLException {
+ COLUMN_NAME
+ ", "
+ SPANNER_TYPE
+ " FROM information_schema.columns WHERE table_name = \"?\"");
+ " FROM information_schema.columns WHERE table_name = ?");
tinaspark marked this conversation as resolved.
Show resolved Hide resolved
statement.setString(1, getTableName());
ResultSet result = statement.executeQuery();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.spanner.v1.TypeCode;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -42,18 +43,17 @@ public class MutationWriter {
private boolean hasHeader;
private boolean isHeaderParsed;
private int mutationCount;
private int batchSize;
private int rowCount;
private List<Mutation> mutations;
private String tableName;
private Map<String, TypeCode> tableColumns;
private CSVFormat format;
private FileWriter fileWriter;
private ByteArrayOutputStream payload = new ByteArrayOutputStream();

public MutationWriter(
String tableName, Map<String, TypeCode> tableColumns, CSVFormat format, boolean hasHeader) {
this.mutationCount = 0;
this.batchSize = 0;
this.hasHeader = hasHeader;
this.isHeaderParsed = false;
this.tableName = tableName;
Expand All @@ -72,25 +72,22 @@ public int getRowCount() {
return this.rowCount;
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
List<CSVRecord> records = parsePayloadData(payload);
if (!records.isEmpty()
&& !payloadFitsInCurrentBatch(records.size() * records.get(0).size(), payload.length)) {
rollback(connectionHandler, payload);
long mutationCount = this.mutationCount + records.size() * records.get(0).size();
long commitSize = this.batchSize + payload.length;
public void addMutations(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you name this addCopyData since we aren't doing anything with mutations yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this.payload.write(payload, 0, payload.length);
if (!payloadFitsInCurrentBatch()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: "
+ mutationCount
+ " or mutation commit size: "
+ commitSize
+ " has exceeded the limit.");
"Commit size: " + this.payload.size() + " has exceeded the limit: " + COMMIT_LIMIT);
}
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutationList(ConnectionHandler connectionHandler) throws Exception {
List<CSVRecord> records = parsePayloadData(this.payload.toByteArray());
for (CSVRecord record : records) {
// Check that the number of columns in a record matches the number of columns in the table
if (record.size() != this.tableColumns.keySet().size()) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid COPY data: Row length mismatched. Expected "
+ this.tableColumns.keySet().size()
Expand Down Expand Up @@ -125,7 +122,7 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
break;
}
} catch (NumberFormatException | DateTimeParseException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid input syntax for type "
+ columnType.toString()
Expand All @@ -134,27 +131,39 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
+ recordValue
+ "\"");
} catch (IllegalArgumentException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException("Invalid input syntax for column \"" + columnName + "\"");
} catch (Exception e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw e;
}
}
this.mutations.add(builder.build()); // Add write builder to mutation list
this.mutationCount += record.size(); // Increment the number of mutations being added
this.rowCount++; // Increment the number of COPY rows by one
}
this.batchSize += payload.length; // Increment the batch size based on payload length
if (!mutationCountIsWithinLimit()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: " + mutationCount + " has exceeded the limit: " + MUTATION_LIMIT);
}
}

/**
* @return True if current payload will fit within COMMIT_LIMIT. This is only an estimate and the
* actual commit size may still be rejected by Spanner.
*/
private boolean payloadFitsInCurrentBatch() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name this commitSizeIsWithinLimit to match the naming of mutationCountIsWithinLimit? Also because the batching has been removed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return this.payload.size() <= COMMIT_LIMIT;
}

/**
* @return True if adding payload to current batch will fit under mutation limit and batch size
* limit, false otherwise.
* @return True if current mutation count will fit within MUTATION_LIMIT. This is only an estimate
* and the actual number of mutations may be different which could result in spanner rejecting
* the transaction.
*/
private boolean payloadFitsInCurrentBatch(int rowMutationCount, int payloadLength) {
return (this.mutationCount + rowMutationCount <= MUTATION_LIMIT
&& this.batchSize + payloadLength <= COMMIT_LIMIT);
private boolean mutationCountIsWithinLimit() {
return this.mutationCount <= MUTATION_LIMIT;
}

/** @return list of CSVRecord rows parsed with CSVParser from CopyData payload byte array */
Expand Down Expand Up @@ -186,24 +195,22 @@ public int writeToSpanner(ConnectionHandler connectionHandler) throws SQLExcepti
// Reset mutations, mutation counter, and batch size count for a new batch
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
return this.rowCount;
}

public void rollback(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
public void rollback(ConnectionHandler connectionHandler) throws Exception {
Connection connection = connectionHandler.getJdbcConnection();
connection.rollback();
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
createErrorFile(payload);
createErrorFile();
}

public void createErrorFile(byte[] payload) throws IOException {
public void createErrorFile() throws IOException {
File unsuccessfulCopy = new File(ERROR_FILE);
if (unsuccessfulCopy.createNewFile()) {
this.fileWriter = new FileWriter(ERROR_FILE);
writeToErrorFile(payload);
writeToErrorFile(this.payload.toByteArray());
} else {
System.err.println("File " + unsuccessfulCopy.getName() + " already exists");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutation(this.connection, this.payload);
mw.addMutations(this.connection, this.payload);
} catch (SQLException e) {
mw.writeToErrorFile(this.payload);
statement.handleExecutionException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutationList(this.connection);
int rowCount =
mw.writeToSpanner(this.connection); // Write any remaining mutations to Spanner
statement.addUpdateCount(rowCount); // Increase the row count of number of rows copied.
Expand All @@ -51,13 +52,17 @@ protected void sendPayload() throws Exception {
// Spanner returned an error when trying to commit the batch of mutations.
mw.writeMutationsToErrorFile();
mw.closeErrorFile();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
throw e;
}
} else {
mw.closeErrorFile();
}
new ReadyResponse(this.outputStream, ReadyResponse.Status.IDLE).send();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
}

Expand Down
37 changes: 23 additions & 14 deletions src/test/java/com/google/cloud/spanner/pgadapter/ProtocolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1202,16 +1202,21 @@ public void testCopyDataMessage() throws Exception {
MutationWriter mb = Mockito.mock(MutationWriter.class);
Mockito.when(copyStatement.getMutationWriter()).thenReturn(mb);

WireMessage message = ControlMessage.create(connectionHandler);
Assert.assertEquals(message.getClass(), CopyDataMessage.class);
Assert.assertArrayEquals(((CopyDataMessage) message).getPayload(), payload);
{
Vizerai marked this conversation as resolved.
Show resolved Hide resolved
WireMessage message = ControlMessage.create(connectionHandler);
Assert.assertEquals(message.getClass(), CopyDataMessage.class);
Assert.assertArrayEquals(((CopyDataMessage) message).getPayload(), payload);

CopyDataMessage messageSpy = (CopyDataMessage) Mockito.spy(message);
messageSpy.send();
CopyDataMessage messageSpy = (CopyDataMessage) Mockito.spy(message);
messageSpy.send();
}
Vizerai marked this conversation as resolved.
Show resolved Hide resolved

Mockito.verify(mb, Mockito.times(1)).buildMutation(connectionHandler, payload);
Mockito.verify(mb, Mockito.times(1)).addMutations(connectionHandler, payload);
}

@Test
public void testMultipleCopyDataMessages() throws Exception {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this is a placeholder for a test that is coming in a next PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated now.


@Test
public void testCopyDoneMessage() throws Exception {
byte[] messageMetadata = {'c'};
Expand Down Expand Up @@ -1290,7 +1295,8 @@ public void testCopyFromFilePipe() throws Exception {
copyStatement.execute();

MutationWriter mw = copyStatement.getMutationWriter();
mw.buildMutation(connectionHandler, payload);
mw.addMutations(connectionHandler, payload);
mw.buildMutationList(connectionHandler);

Assert.assertEquals(copyStatement.getFormatType(), "TEXT");
Assert.assertEquals(copyStatement.getDelimiterChar(), '\t');
Expand Down Expand Up @@ -1327,15 +1333,15 @@ public void testCopyBatchSizeLimit() throws Exception {
MutationWriter mw = copyStatement.getMutationWriter();
MutationWriter mwSpy = Mockito.spy(mw);
Mockito.when(mwSpy.writeToSpanner(connectionHandler)).thenReturn(10, 2);
mwSpy.buildMutation(connectionHandler, payload);
mwSpy.addMutations(connectionHandler, payload);
mwSpy.buildMutationList(connectionHandler);
mwSpy.writeToSpanner(connectionHandler);

Assert.assertEquals(copyStatement.getFormatType(), "TEXT");
Assert.assertEquals(copyStatement.getDelimiterChar(), '\t');
Assert.assertEquals(copyStatement.hasException(), false);
Assert.assertEquals(mwSpy.getRowCount(), 12);

// Verify writeToSpanner is called once inside buildMutation when batch size is exceeded
Mockito.verify(mwSpy, Mockito.times(1)).writeToSpanner(connectionHandler);
copyStatement.close();
}
Expand Down Expand Up @@ -1370,7 +1376,8 @@ public void testCopyDataRowLengthMismatchLimit() throws Exception {
Assert.assertThrows(
SQLException.class,
() -> {
mwSpy.buildMutation(connectionHandler, payload);
mwSpy.addMutations(connectionHandler, payload);
mwSpy.buildMutationList(connectionHandler);
;
});
Assert.assertEquals(
Expand Down Expand Up @@ -1408,12 +1415,13 @@ public void testCopyResumeErrorOutputFile() throws Exception {
Assert.assertThrows(
SQLException.class,
() -> {
mwSpy.buildMutation(connectionHandler, payload);
mwSpy.addMutations(connectionHandler, payload);
mwSpy.buildMutationList(connectionHandler);
mwSpy.writeToSpanner(connectionHandler);
});
Assert.assertEquals(thrown.getMessage(), "Invalid input syntax for type INT64:\"'5'\"");

Mockito.verify(mwSpy, Mockito.times(1)).createErrorFile(payload);
Mockito.verify(mwSpy, Mockito.times(1)).createErrorFile();
Mockito.verify(mwSpy, Mockito.times(1)).writeToErrorFile(payload);

File outputFile = new File("output.txt");
Expand Down Expand Up @@ -1454,12 +1462,13 @@ public void testCopyResumeErrorStartOutputFile() throws Exception {
Assert.assertThrows(
SQLException.class,
() -> {
mwSpy.buildMutation(connectionHandler, payload);
mwSpy.addMutations(connectionHandler, payload);
mwSpy.buildMutationList(connectionHandler);
mwSpy.writeToSpanner(connectionHandler);
});
Assert.assertEquals(thrown.getMessage(), "Invalid input syntax for type INT64:\"'1'\"");

Mockito.verify(mwSpy, Mockito.times(1)).createErrorFile(payload);
Mockito.verify(mwSpy, Mockito.times(1)).createErrorFile();
Mockito.verify(mwSpy, Mockito.times(1)).writeToErrorFile(payload);

File outputFile = new File("output.txt");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ public void testCopyBuildMutation() throws Exception {

byte[] payload = "2\t3\n".getBytes();
MutationWriter mw = statement.getMutationWriter();
mw.buildMutation(connectionHandler, payload);
mw.addMutations(connectionHandler, payload);
mw.buildMutationList(connectionHandler);

Assert.assertEquals(statement.getFormatType(), "TEXT");
Assert.assertEquals(statement.getDelimiterChar(), '\t');
Expand Down Expand Up @@ -489,7 +490,8 @@ public void testCopyInvalidBuildMutation() throws Exception {
Assert.assertThrows(
SQLException.class,
() -> {
mw.buildMutation(connectionHandler, payload);
mw.addMutations(connectionHandler, payload);
mw.buildMutationList(connectionHandler);
});
Assert.assertEquals(
thrown.getMessage(),
Expand Down