Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Copy command parsing in QueryMessage and basic psql e2e test #43

Merged
merged 21 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .ci/e2e-expected/copy-from-stdin.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
id | age | name
----+-----+------
1 | 1 | John
2 | 20 | Joe
3 | 23 | Jack
5 | 5 | 5
6 | 6 | 6
7 | 7 | 7
8 | 8 | 8
9 | 9 | 9
10 | 10 | 10
(9 rows)
14 changes: 14 additions & 0 deletions .ci/evaluate-with-psql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,17 @@ echo "------Test \"-c option invalid begin/commit batching\"------"
/usr/lib/postgresql/"${PSQL_VERSION}"/bin/psql -h localhost -p 4242 -d "${GOOGLE_CLOUD_DATABASE_WITH_VERSION}" -c "$(cat .ci/e2e-batching/invalid-commit-batch.txt)" &> .ci/e2e-result/invalid-commit-batching.txt
diff -i -w -s .ci/e2e-result/invalid-commit-batching.txt .ci/e2e-expected/invalid-commit-batching.txt
RETURN_CODE=$((${RETURN_CODE}||$?))

echo "------Test \"COPY FROM STDIN\"------"
/usr/lib/postgresql/"${PSQL_VERSION}"/bin/psql -h localhost -p 4242 -d "${GOOGLE_CLOUD_DATABASE_WITH_VERSION}" -c "COPY users FROM STDIN;" <<EOF
5 5 5
6 6 6
7 7 7
8 8 8
9 9 9
10 10 10
\.
EOF
echo "SELECT * FROM users;" | /usr/lib/postgresql/"${PSQL_VERSION}"/bin/psql -h localhost -p 4242 -d "${GOOGLE_CLOUD_DATABASE_WITH_VERSION}" > .ci/e2e-result/copy-from-stdin.txt
diff -i -w -s .ci/e2e-result/copy-from-stdin.txt .ci/e2e-expected/copy-from-stdin.txt
RETURN_CODE=$((${RETURN_CODE}||$?))
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,16 @@ public synchronized IntermediateStatement getActiveStatement() {
return activeStatementsMap.get(this.connectionId);
}

public synchronized ConnectionStatus getStatus() {
return status;
}

public synchronized void setStatus(ConnectionStatus status) {
this.status = status;
}

/** Status of a {@link ConnectionHandler} */
private enum ConnectionStatus {
public enum ConnectionStatus {
UNAUTHENTICATED,
IDLE,
COPY_IN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public MutationWriter getMutationWriter() {
return this.mutationWriter;
}

/** @return 0 for text/csv formatting and 1 for binary */
public int getFormatCode() {
return (options.getFormat() == CopyTreeParser.CopyOptions.Format.BINARY) ? 1 : 0;
}

private void verifyCopyColumns() throws SQLException {
if (options.getColumnNames().size() == 0) {
// Use all columns if none were specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.cloud.spanner.pgadapter.wireprotocol;

import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.ConnectionStatus;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.metadata.SendResultSetState;
import com.google.cloud.spanner.pgadapter.statements.IntermediateStatement;
Expand Down Expand Up @@ -49,35 +50,46 @@ public ControlMessage(ConnectionHandler connection) throws IOException {
*/
public static ControlMessage create(ConnectionHandler connection) throws Exception {
char nextMsg = (char) connection.getConnectionMetadata().getInputStream().readUnsignedByte();
switch (nextMsg) {
case QueryMessage.IDENTIFIER:
return new QueryMessage(connection);
case ParseMessage.IDENTIFIER:
return new ParseMessage(connection);
case BindMessage.IDENTIFIER:
return new BindMessage(connection);
case DescribeMessage.IDENTIFIER:
return new DescribeMessage(connection);
case ExecuteMessage.IDENTIFIER:
return new ExecuteMessage(connection);
case CloseMessage.IDENTIFIER:
return new CloseMessage(connection);
case SyncMessage.IDENTIFIER:
return new SyncMessage(connection);
case TerminateMessage.IDENTIFIER:
return new TerminateMessage(connection);
case CopyDoneMessage.IDENTIFIER:
return new CopyDoneMessage(connection);
case CopyDataMessage.IDENTIFIER:
return new CopyDataMessage(connection);
case CopyFailMessage.IDENTIFIER:
return new CopyFailMessage(connection);
case FunctionCallMessage.IDENTIFIER:
return new FunctionCallMessage(connection);
case FlushMessage.IDENTIFIER:
return new FlushMessage(connection);
default:
throw new IllegalStateException("Unknown message");
if (connection.getStatus() == ConnectionStatus.COPY_IN) {
switch (nextMsg) {
case CopyDoneMessage.IDENTIFIER:
return new CopyDoneMessage(connection);
case CopyDataMessage.IDENTIFIER:
return new CopyDataMessage(connection);
default:
tinaspark marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("Expected 0 or more Copy Data messages.");
}
} else {
switch (nextMsg) {
case QueryMessage.IDENTIFIER:
return new QueryMessage(connection);
case ParseMessage.IDENTIFIER:
return new ParseMessage(connection);
case BindMessage.IDENTIFIER:
return new BindMessage(connection);
case DescribeMessage.IDENTIFIER:
return new DescribeMessage(connection);
case ExecuteMessage.IDENTIFIER:
return new ExecuteMessage(connection);
case CloseMessage.IDENTIFIER:
return new CloseMessage(connection);
case SyncMessage.IDENTIFIER:
return new SyncMessage(connection);
case TerminateMessage.IDENTIFIER:
return new TerminateMessage(connection);
case CopyDoneMessage.IDENTIFIER:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we allowed to receive CopyDone/CopyData/CopyFail if we're not in COPY_IN mode?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it should not.

return new CopyDoneMessage(connection);
case CopyDataMessage.IDENTIFIER:
return new CopyDataMessage(connection);
case CopyFailMessage.IDENTIFIER:
return new CopyFailMessage(connection);
case FunctionCallMessage.IDENTIFIER:
return new FunctionCallMessage(connection);
case FlushMessage.IDENTIFIER:
return new FlushMessage(connection);
default:
throw new IllegalStateException("Unknown message");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ protected void sendPayload() throws Exception {
// Spanner returned an error when trying to commit the batch of mutations.
mw.writeMutationsToErrorFile();
mw.closeErrorFile();
this.connection.setStatus(ConnectionStatus.IDLE);
olavloite marked this conversation as resolved.
Show resolved Hide resolved
this.connection.removeActiveStatement(this.statement);
throw e;
}
} else {
mw.closeErrorFile();
}
new ReadyResponse(this.outputStream, ReadyResponse.Status.IDLE).send();
this.connection.setStatus(ConnectionStatus.IDLE);
olavloite marked this conversation as resolved.
Show resolved Hide resolved
this.connection.removeActiveStatement(this.statement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.ConnectionStatus;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
import com.google.cloud.spanner.pgadapter.statements.CopyStatement;
import com.google.cloud.spanner.pgadapter.statements.IntermediateStatement;
import com.google.cloud.spanner.pgadapter.statements.MatcherStatement;
import com.google.cloud.spanner.pgadapter.utils.StatementParser;
import com.google.cloud.spanner.pgadapter.wireoutput.CopyInResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse.Status;
import com.google.cloud.spanner.pgadapter.wireoutput.RowDescriptionResponse;
Expand All @@ -29,13 +32,17 @@
public class QueryMessage extends ControlMessage {

protected static final char IDENTIFIER = 'Q';
protected static final String COPY = "COPY";

private IntermediateStatement statement;

public QueryMessage(ConnectionHandler connection) throws Exception {
super(connection);
String query = StatementParser.removeCommentsAndTrim(this.readAll());
if (!connection.getServer().getOptions().requiresMatcher()) {
String command = StatementParser.parseCommand(query);
if (command.equalsIgnoreCase(COPY)) {
this.statement = new CopyStatement(query, this.connection.getJdbcConnection());
} else if (!connection.getServer().getOptions().requiresMatcher()) {
this.statement = new IntermediateStatement(query, this.connection.getJdbcConnection());
} else {
this.statement = new MatcherStatement(query, this.connection);
Expand All @@ -47,7 +54,9 @@ public QueryMessage(ConnectionHandler connection) throws Exception {
protected void sendPayload() throws Exception {
this.statement.execute();
this.handleQuery();
this.connection.removeActiveStatement(this.statement);
if (!this.statement.getCommand().equalsIgnoreCase(COPY)) {
this.connection.removeActiveStatement(this.statement);
}
}

@Override
Expand Down Expand Up @@ -81,7 +90,15 @@ public void handleQuery() throws Exception {
if (this.statement.hasException()) {
this.handleError(this.statement.getException());
} else {
if (this.statement.containsResultSet()) {
if (this.statement.getCommand().equalsIgnoreCase(COPY)) {
CopyStatement copyStatement = (CopyStatement) this.statement;
new CopyInResponse(
this.outputStream,
copyStatement.getTableColumns().size(),
copyStatement.getFormatCode())
.send();
this.connection.setStatus(ConnectionStatus.COPY_IN);
} else if (this.statement.containsResultSet()) {
new RowDescriptionResponse(
this.outputStream,
this.statement,
Expand Down