Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed query cancellation bug that intermittently occurs in batch queries #1897

Merged
merged 4 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7500,7 +7500,7 @@ protected void setInterruptsEnabled(boolean interruptsEnabled) {
// Flag set to indicate that an interrupt has happened.
private volatile boolean wasInterrupted = false;

private boolean wasInterrupted() {
boolean wasInterrupted() {
return wasInterrupted;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2868,6 +2868,21 @@ final void doExecutePreparedStatementBatch(PrepStmtBatchExecCmd batchCommand) th
for (int attempt = 1; attempt <= 2; ++attempt) {
try {

// If the command was interrupted, that means the TDS.PKT_CANCEL_REQ was sent to the server.
// Since the cancellation request was sent, stop processing the batch query and process the
// cancellation request and then return.
//
// Otherwise, if we do continue processing the batch query, in the case where a query requires
// prepexec/sp_prepare, the TDS request for prepexec/sp_prepare will be sent regardless of
// query cancellation. This will cause a TDS token error in the post processing when we
// close the query.
if (batchCommand.wasInterrupted()) {
ensureExecuteResultsReader(batchCommand.startResponse(getIsResponseBufferingAdaptive()));
startResults();
getNextResult(true);
return;
}

// Re-use handle if available, requires parameter definitions which are not available until here.
if (reuseCachedHandle(hasNewTypeDefinitions, 1 < attempt)) {
hasNewTypeDefinitions = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected Object[][] getContents() {
{"R_noServerResponse", "SQL Server did not return a response. The connection has been closed."},
{"R_truncatedServerResponse", "SQL Server returned an incomplete response. The connection has been closed."},
{"R_queryTimedOut", "The query has timed out."},
{"R_queryCancelled", "The query was canceled."},
{"R_queryCancelled", "The query was cancelled."},
{"R_errorReadingStream", "An error occurred while reading the value from the stream object. Error: \"{0}\""},
{"R_streamReadReturnedInvalidValue", "The stream read operation returned an invalid value for the amount of data read."},
{"R_mismatchedStreamLength", "The stream value is not the specified length. The specified length was {0}, the actual length is {1}."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ protected Object[][] getContents() {
{"R_cancellationFailed", "Cancellation failed."}, {"R_executionNotTimeout", "Execution did not timeout."},
{"R_executionTooLong", "Execution took too long."},
{"R_executionNotLong", "Execution did not take long enough."},
{"R_queryCancelled", "The query was canceled."},
{"R_statementShouldBeClosed", "statement should be closed since resultset is closed."},
{"R_statementShouldBeOpened", "statement should be opened since resultset is opened."},
{"R_shouldBeWrapper", "{0} should be a wrapper for {1}."},
Expand Down Expand Up @@ -201,5 +200,6 @@ protected Object[][] getContents() {
{"R_objectNullOrEmpty", "The {0} is null or empty."},
{"R_cekDecryptionFailed", "Failed to decrypt a column encryption key using key store provider: {0}."},
{"R_connectTimedOut", "connect timed out"},
{"R_queryCancelled", "The query was cancelled."},
{"R_sessionKilled", "Cannot continue the execution because the session is in the kill state"}};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.microsoft.sqlserver.jdbc.unit.statement;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -69,6 +70,51 @@ public void testBatchSpPrepare() throws Exception {
testExecuteBatch1UseBulkCopyAPI();
}

@Test
public void testBatchStatementCancellation() throws Exception {
try (Connection connection = PrepUtil.getConnection(connectionString)) {
connection.setAutoCommit(false);

try (PreparedStatement statement = connection.prepareStatement(
"if object_id('test_table') is not null drop table test_table")) {
statement.execute();
}
connection.commit();

try (PreparedStatement statement = connection.prepareStatement(
"create table test_table (column_name bit)")) {
statement.execute();
}
connection.commit();

for (long delayInMilliseconds : new long[] { 1, 2, 4, 8, 16, 32, 64, 128 }) {
for (int numberOfCommands : new int[] { 1, 2, 4, 8, 16, 32, 64 }) {
int parameterCount = 512;

try (PreparedStatement statement = connection.prepareStatement(
"insert into test_table values (?)" + repeat(",(?)", parameterCount - 1))) {

for (int i = 0; i < numberOfCommands; i++) {
for (int j = 0; j < parameterCount; j++) {
statement.setBoolean(j + 1, true);
}
statement.addBatch();
}

Thread cancelThread = cancelAsync(statement, delayInMilliseconds);
try {
statement.executeBatch();
} catch (SQLException e) {
assertEquals(TestResource.getResource("R_queryCancelled"), e.getMessage());
}
cancelThread.join();
}
connection.commit();
}
}
}
}

/**
* Get a PreparedStatement object and call the addBatch() method with 3 SQL statements and call the executeBatch()
* method and it should return array of Integer values of length 3
Expand Down Expand Up @@ -240,6 +286,29 @@ private void modifyConnectionForBulkCopyAPI(SQLServerConnection con) throws Exce
con.setUseBulkCopyForBatchInsert(true);
}

private static String repeat(String string, int count) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < count; i++) {
sb.append(string);
}
return sb.toString();
}

private static Thread cancelAsync(Statement statement, long delayInMilliseconds) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(delayInMilliseconds);
statement.cancel();
} catch (SQLException | InterruptedException e) {
// does not/must not happen
e.printStackTrace();
throw new IllegalStateException(e);
}
});
thread.start();
return thread;
}

@BeforeAll
public static void testSetup() throws TestAbortedException, Exception {
setConnection();
Expand Down