Skip to content

Commit

Permalink
Fixed query cancellation bug that intermittently occurs in batch quer…
Browse files Browse the repository at this point in the history
…ies (#1897)

* Fixed query cancellation bug that intermittently occurs in batch queries

* Changed to access to package level

* Comment correction

* Spelling consistency correction
  • Loading branch information
tkyc authored Aug 16, 2022
1 parent 726a786 commit bc30270
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7500,7 +7500,7 @@ protected void setInterruptsEnabled(boolean interruptsEnabled) {
// Flag set to indicate that an interrupt has happened.
private volatile boolean wasInterrupted = false;

private boolean wasInterrupted() {
boolean wasInterrupted() {
return wasInterrupted;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2868,6 +2868,21 @@ final void doExecutePreparedStatementBatch(PrepStmtBatchExecCmd batchCommand) th
for (int attempt = 1; attempt <= 2; ++attempt) {
try {

// If the command was interrupted, that means the TDS.PKT_CANCEL_REQ was sent to the server.
// Since the cancellation request was sent, stop processing the batch query and process the
// cancellation request and then return.
//
// Otherwise, if we do continue processing the batch query, in the case where a query requires
// prepexec/sp_prepare, the TDS request for prepexec/sp_prepare will be sent regardless of
// query cancellation. This will cause a TDS token error in the post processing when we
// close the query.
if (batchCommand.wasInterrupted()) {
ensureExecuteResultsReader(batchCommand.startResponse(getIsResponseBufferingAdaptive()));
startResults();
getNextResult(true);
return;
}

// Re-use handle if available, requires parameter definitions which are not available until here.
if (reuseCachedHandle(hasNewTypeDefinitions, 1 < attempt)) {
hasNewTypeDefinitions = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected Object[][] getContents() {
{"R_noServerResponse", "SQL Server did not return a response. The connection has been closed."},
{"R_truncatedServerResponse", "SQL Server returned an incomplete response. The connection has been closed."},
{"R_queryTimedOut", "The query has timed out."},
{"R_queryCancelled", "The query was canceled."},
{"R_queryCancelled", "The query was cancelled."},
{"R_errorReadingStream", "An error occurred while reading the value from the stream object. Error: \"{0}\""},
{"R_streamReadReturnedInvalidValue", "The stream read operation returned an invalid value for the amount of data read."},
{"R_mismatchedStreamLength", "The stream value is not the specified length. The specified length was {0}, the actual length is {1}."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ protected Object[][] getContents() {
{"R_cancellationFailed", "Cancellation failed."}, {"R_executionNotTimeout", "Execution did not timeout."},
{"R_executionTooLong", "Execution took too long."},
{"R_executionNotLong", "Execution did not take long enough."},
{"R_queryCancelled", "The query was canceled."},
{"R_statementShouldBeClosed", "statement should be closed since resultset is closed."},
{"R_statementShouldBeOpened", "statement should be opened since resultset is opened."},
{"R_shouldBeWrapper", "{0} should be a wrapper for {1}."},
Expand Down Expand Up @@ -201,5 +200,6 @@ protected Object[][] getContents() {
{"R_objectNullOrEmpty", "The {0} is null or empty."},
{"R_cekDecryptionFailed", "Failed to decrypt a column encryption key using key store provider: {0}."},
{"R_connectTimedOut", "connect timed out"},
{"R_queryCancelled", "The query was cancelled."},
{"R_sessionKilled", "Cannot continue the execution because the session is in the kill state"}};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.microsoft.sqlserver.jdbc.unit.statement;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -69,6 +70,51 @@ public void testBatchSpPrepare() throws Exception {
testExecuteBatch1UseBulkCopyAPI();
}

@Test
public void testBatchStatementCancellation() throws Exception {
try (Connection connection = PrepUtil.getConnection(connectionString)) {
connection.setAutoCommit(false);

try (PreparedStatement statement = connection.prepareStatement(
"if object_id('test_table') is not null drop table test_table")) {
statement.execute();
}
connection.commit();

try (PreparedStatement statement = connection.prepareStatement(
"create table test_table (column_name bit)")) {
statement.execute();
}
connection.commit();

for (long delayInMilliseconds : new long[] { 1, 2, 4, 8, 16, 32, 64, 128 }) {
for (int numberOfCommands : new int[] { 1, 2, 4, 8, 16, 32, 64 }) {
int parameterCount = 512;

try (PreparedStatement statement = connection.prepareStatement(
"insert into test_table values (?)" + repeat(",(?)", parameterCount - 1))) {

for (int i = 0; i < numberOfCommands; i++) {
for (int j = 0; j < parameterCount; j++) {
statement.setBoolean(j + 1, true);
}
statement.addBatch();
}

Thread cancelThread = cancelAsync(statement, delayInMilliseconds);
try {
statement.executeBatch();
} catch (SQLException e) {
assertEquals(TestResource.getResource("R_queryCancelled"), e.getMessage());
}
cancelThread.join();
}
connection.commit();
}
}
}
}

/**
* Get a PreparedStatement object and call the addBatch() method with 3 SQL statements and call the executeBatch()
* method and it should return array of Integer values of length 3
Expand Down Expand Up @@ -240,6 +286,29 @@ private void modifyConnectionForBulkCopyAPI(SQLServerConnection con) throws Exce
con.setUseBulkCopyForBatchInsert(true);
}

private static String repeat(String string, int count) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < count; i++) {
sb.append(string);
}
return sb.toString();
}

private static Thread cancelAsync(Statement statement, long delayInMilliseconds) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(delayInMilliseconds);
statement.cancel();
} catch (SQLException | InterruptedException e) {
// does not/must not happen
e.printStackTrace();
throw new IllegalStateException(e);
}
});
thread.start();
return thread;
}

@BeforeAll
public static void testSetup() throws TestAbortedException, Exception {
setConnection();
Expand Down

0 comments on commit bc30270

Please sign in to comment.