Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for multiple CopyData messages #40

Merged
merged 20 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public MutationWriter getMutationWriter() {
return this.mutationWriter;
}

/** @return 0 for text/csv formatting and 1 for binary */
public int getFormatCode() {
return (options.getFormat() == CopyTreeParser.CopyOptions.Format.BINARY) ? 1 : 0;
}

private void verifyCopyColumns() throws SQLException {
if (options.getColumnNames().size() == 0) {
// Use all columns if none were specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.spanner.v1.TypeCode;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -42,18 +43,17 @@ public class MutationWriter {
private boolean hasHeader;
private boolean isHeaderParsed;
private int mutationCount;
private int batchSize;
private int rowCount;
private List<Mutation> mutations;
private String tableName;
private Map<String, TypeCode> tableColumns;
private CSVFormat format;
private FileWriter fileWriter;
private ByteArrayOutputStream payload = new ByteArrayOutputStream();

public MutationWriter(
String tableName, Map<String, TypeCode> tableColumns, CSVFormat format, boolean hasHeader) {
this.mutationCount = 0;
this.batchSize = 0;
this.hasHeader = hasHeader;
this.isHeaderParsed = false;
this.tableName = tableName;
Expand All @@ -72,25 +72,22 @@ public int getRowCount() {
return this.rowCount;
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
List<CSVRecord> records = parsePayloadData(payload);
if (!records.isEmpty()
&& !payloadFitsInCurrentBatch(records.size() * records.get(0).size(), payload.length)) {
rollback(connectionHandler, payload);
long mutationCount = this.mutationCount + records.size() * records.get(0).size();
long commitSize = this.batchSize + payload.length;
public void addCopyData(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
this.payload.write(payload, 0, payload.length);
if (!commitSizeIsWithinLimit()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: "
+ mutationCount
+ " or mutation commit size: "
+ commitSize
+ " has exceeded the limit.");
"Commit size: " + this.payload.size() + " has exceeded the limit: " + COMMIT_LIMIT);
}
}

/** Build mutation to add to mutations list with data contained within a CopyData payload */
public void buildMutationList(ConnectionHandler connectionHandler) throws Exception {
List<CSVRecord> records = parsePayloadData(this.payload.toByteArray());
for (CSVRecord record : records) {
// Check that the number of columns in a record matches the number of columns in the table
if (record.size() != this.tableColumns.keySet().size()) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid COPY data: Row length mismatched. Expected "
+ this.tableColumns.keySet().size()
Expand Down Expand Up @@ -125,7 +122,7 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
break;
}
} catch (NumberFormatException | DateTimeParseException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException(
"Invalid input syntax for type "
+ columnType.toString()
Expand All @@ -134,27 +131,39 @@ public void buildMutation(ConnectionHandler connectionHandler, byte[] payload) t
+ recordValue
+ "\"");
} catch (IllegalArgumentException e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw new SQLException("Invalid input syntax for column \"" + columnName + "\"");
} catch (Exception e) {
rollback(connectionHandler, payload);
rollback(connectionHandler);
throw e;
}
}
this.mutations.add(builder.build()); // Add write builder to mutation list
this.mutationCount += record.size(); // Increment the number of mutations being added
this.rowCount++; // Increment the number of COPY rows by one
}
this.batchSize += payload.length; // Increment the batch size based on payload length
if (!mutationCountIsWithinLimit()) {
rollback(connectionHandler);
throw new SQLException(
"Mutation count: " + mutationCount + " has exceeded the limit: " + MUTATION_LIMIT);
}
}

/**
* @return True if adding payload to current batch will fit under mutation limit and batch size
* limit, false otherwise.
* @return True if current payload will fit within COMMIT_LIMIT. This is only an estimate and the
* actual commit size may still be rejected by Spanner.
*/
private boolean payloadFitsInCurrentBatch(int rowMutationCount, int payloadLength) {
return (this.mutationCount + rowMutationCount <= MUTATION_LIMIT
&& this.batchSize + payloadLength <= COMMIT_LIMIT);
private boolean commitSizeIsWithinLimit() {
return this.payload.size() <= COMMIT_LIMIT;
}

/**
* @return True if current mutation count will fit within MUTATION_LIMIT. This is only an estimate
* and the actual number of mutations may be different which could result in spanner rejecting
* the transaction.
*/
private boolean mutationCountIsWithinLimit() {
return this.mutationCount <= MUTATION_LIMIT;
}

/** @return list of CSVRecord rows parsed with CSVParser from CopyData payload byte array */
Expand Down Expand Up @@ -186,44 +195,40 @@ public int writeToSpanner(ConnectionHandler connectionHandler) throws SQLExcepti
// Reset mutations, mutation counter, and batch size count for a new batch
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
return this.rowCount;
}

public void rollback(ConnectionHandler connectionHandler, byte[] payload) throws Exception {
public void rollback(ConnectionHandler connectionHandler) throws Exception {
Connection connection = connectionHandler.getJdbcConnection();
connection.rollback();
this.mutations = new ArrayList<>();
this.mutationCount = 0;
this.batchSize = 0;
createErrorFile(payload);
writeCopyDataToErrorFile(this.payload.toByteArray());
this.payload.reset();
}

public void createErrorFile(byte[] payload) throws IOException {
private void createErrorFile() throws IOException {
File unsuccessfulCopy = new File(ERROR_FILE);
if (unsuccessfulCopy.createNewFile()) {
this.fileWriter = new FileWriter(ERROR_FILE);
writeToErrorFile(payload);
} else {
System.err.println("File " + unsuccessfulCopy.getName() + " already exists");
}
this.fileWriter = new FileWriter(unsuccessfulCopy, false);
}

public void writeToErrorFile(byte[] payload) throws IOException {
if (this.fileWriter != null) {
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n");
/**
* Copy data will be written to an error file if size limits were exceeded or a problem was
* encountered.
*/
public void writeCopyDataToErrorFile(byte[] payload) throws IOException {
if (this.fileWriter == null) {
createErrorFile();
}
this.fileWriter.write(new String(payload, StandardCharsets.UTF_8).trim() + "\n");
}

public void writeMutationsToErrorFile() throws IOException {
File unsuccessfulCopy = new File(ERROR_FILE);
if (unsuccessfulCopy.createNewFile()) {
this.fileWriter = new FileWriter(ERROR_FILE);
}

for (Mutation mutation : this.mutations) {
this.fileWriter.write(mutation.toString());
}
/**
* Copy data will be written to an error file if a problem was encountered while generating the
* mutaiton list or if Spanner returns an error upon commiting the mutations.
*/
public void writeCopyDataToErrorFile() throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think every time we write to the error file we write the full payload so you can just have a single version of writeCopyDataToErrorFile() without having a payload argument

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

writeCopyDataToErrorFile(this.payload.toByteArray());
}

public void closeErrorFile() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutation(this.connection, this.payload);
mw.addCopyData(this.connection, this.payload);
} catch (SQLException e) {
mw.writeToErrorFile(this.payload);
mw.writeCopyDataToErrorFile(this.payload);
statement.handleExecutionException(e);
throw e;
}
} else {
mw.writeToErrorFile(this.payload);
mw.writeCopyDataToErrorFile(this.payload);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,26 @@ protected void sendPayload() throws Exception {
MutationWriter mw = this.statement.getMutationWriter();
if (!statement.hasException()) {
try {
mw.buildMutationList(this.connection);
int rowCount =
mw.writeToSpanner(this.connection); // Write any remaining mutations to Spanner
statement.addUpdateCount(rowCount); // Increase the row count of number of rows copied.
this.sendSpannerResult(this.statement, QueryMode.SIMPLE, 0L);
} catch (Exception e) {
// Spanner returned an error when trying to commit the batch of mutations.
mw.writeMutationsToErrorFile();
mw.writeCopyDataToErrorFile();
mw.closeErrorFile();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
throw e;
}
} else {
mw.closeErrorFile();
}
new ReadyResponse(this.outputStream, ReadyResponse.Status.IDLE).send();
// TODO: enable in next PR
// this.connection.setStatus(ConnectionStatus.IDLE);
this.connection.removeActiveStatement(this.statement);
}

Expand Down
Loading