Skip to content

Commit

Permalink
Implement MySqlCommand.Cancel.
Browse files Browse the repository at this point in the history
Opens a separate connection to the server to execute 'KILL QUERY n' for the current connection.
  • Loading branch information
bgrainger committed Apr 11, 2017
1 parent 596a139 commit 3e4d8f5
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public virtual async Task<object> ExecuteScalarAsync(string commandText, MySqlPa
public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection,
CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
m_command.Connection.Session.StartQuerying();
m_command.Connection.Session.StartQuerying(m_command);
m_command.LastInsertedId = -1;
var statementPreparerOptions = StatementPreparerOptions.None;
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)
Expand Down
6 changes: 1 addition & 5 deletions src/MySqlConnector/MySqlClient/MySqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ public MySqlCommand(string commandText, MySqlConnection connection, MySqlTransac
}
}

public override void Cancel()
{
// documentation says this shouldn't throw (but just fail silently), but for now make it explicit that this doesn't work
throw new NotSupportedException("Use the Async overloads with a CancellationToken.");
}
public override void Cancel() => Connection.Cancel(this);

public override int ExecuteNonQuery() =>
ExecuteNonQueryAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
Expand Down
31 changes: 31 additions & 0 deletions src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,37 @@ internal MySqlSession Session
}
}

internal void Cancel(MySqlCommand command)
{
var session = Session;
if (!session.TryStartCancel(command))
return;

try
{
// open a dedicated connection to the server to kill the active query
var csb = new MySqlConnectionStringBuilder(m_connectionStringBuilder.GetConnectionString(includePassword: true));
csb.Pooling = false;
if (m_session.IPAddress != null)
csb.Server = m_session.IPAddress.ToString();
csb.ConnectionTimeout = 3u;

using (var connection = new MySqlConnection(csb.ConnectionString))
{
connection.Open();
using (var killCommand = new MySqlCommand("KILL QUERY {0}".FormatInvariant(command.Connection.ServerThread), connection))
{
session.DoCancel(command, killCommand);
}
}
}
catch (MySqlException)
{
// cancelling the query failed; setting the state back to 'Querying' will allow another call to 'Cancel' to try again
session.AbortCancel(command);
}
}

internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, string name, CancellationToken cancellationToken)
{
if (State != ConnectionState.Open)
Expand Down
10 changes: 9 additions & 1 deletion src/MySqlConnector/MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,17 @@ private void DoClose()
{
if (Command != null)
{
while (NextResult())
try
{
while (NextResult())
{
}
}
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
{
// ignore "Query execution was interrupted" exceptions when closing a data reader
}

m_resultSet = null;
m_resultSetBuffered = null;
m_nextResultSetBuffer.Clear();
Expand Down
5 changes: 5 additions & 0 deletions src/MySqlConnector/MySqlClient/MySqlErrorCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,10 @@ public enum MySqlErrorCode
/// You have an error in your SQL syntax (ER_PARSE_ERROR).
/// </summary>
ParseError = 1064,

/// <summary>
/// Query execution was interrupted (ER_QUERY_INTERRUPTED).
/// </summary>
QueryInterrupted = 1317,
}
}
13 changes: 12 additions & 1 deletion src/MySqlConnector/MySqlClient/Results/ResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,18 @@ private ValueTask<Row> ScanRowAsync(IOBehavior ioBehavior, Row row, Cancellation
? new ValueTask<Row>(ScanRowAsyncRemainder(payloadValueTask.Result))
: new ValueTask<Row>(ScanRowAsyncAwaited(payloadValueTask.AsTask()));

async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask) => ScanRowAsyncRemainder(await payloadTask.ConfigureAwait(false));
async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask)
{
try
{
return ScanRowAsyncRemainder(await payloadTask.ConfigureAwait(false));
}
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
{
BufferState = State = ResultSetState.NoMoreData;
throw;
}
}

Row ScanRowAsyncRemainder(PayloadData payload)
{
Expand Down
54 changes: 50 additions & 4 deletions src/MySqlConnector/Serialization/MySqlSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public MySqlSession(ConnectionPool pool, int poolGeneration)
public ConnectionPool Pool { get; }
public int PoolGeneration { get; }
public string DatabaseOverride { get; set; }
public IPAddress IPAddress => (m_tcpClient?.Client.RemoteEndPoint as IPEndPoint)?.Address;

public void ReturnToPool() => Pool?.Return(this);

Expand All @@ -48,23 +49,66 @@ public bool IsConnected
}
}

public void StartQuerying()
public bool TryStartCancel(MySqlCommand command)
{
lock (m_lock)
{
if (m_state == State.Querying)
if (m_activeCommand != command)
return false;
VerifyState(State.Querying, State.CancelingQuery);
if (m_state != State.Querying)
return false;
m_state = State.CancelingQuery;
}

return true;
}

public void DoCancel(MySqlCommand commandToCancel, MySqlCommand killCommand)
{
lock (m_lock)
{
if (m_activeCommand != commandToCancel)
return;

// NOTE: This command is executed while holding the lock to prevent race conditions during asynchronous cancellation.
// For example, if the lock weren't held, the current command could finish and the other thread could set m_activeCommand
// to null, then start executing a new command. By the time this "KILL QUERY" command reached the server, the wrong
// command would be killed (because "KILL QUERY" specifies the connection whose command should be killed, not
// a unique identifier of the command itself). As a mitigation, we set the CommandTimeout to a low value to avoid
// blocking the other thread for an extended duration.
killCommand.CommandTimeout = 3;
killCommand.ExecuteNonQuery();
}
}

public void AbortCancel(MySqlCommand command)
{
lock (m_lock)
{
if (m_activeCommand == command && m_state == State.CancelingQuery)
m_state = State.Querying;
}
}

public void StartQuerying(MySqlCommand command)
{
lock (m_lock)
{
if (m_state == State.Querying || m_state == State.CancelingQuery)
throw new MySqlException("There is already an open DataReader associated with this Connection which must be closed first.");

VerifyState(State.Connected);
m_state = State.Querying;
m_activeCommand = command;
}
}

public MySqlDataReader ActiveReader => m_activeReader;

public void SetActiveReader(MySqlDataReader dataReader)
{
VerifyState(State.Querying);
VerifyState(State.Querying, State.CancelingQuery);
if (dataReader == null)
throw new ArgumentNullException(nameof(dataReader));
if (m_activeReader != null)
Expand All @@ -76,9 +120,10 @@ public void FinishQuerying()
{
lock (m_lock)
{
VerifyState(State.Querying);
VerifyState(State.Querying, State.CancelingQuery);
m_state = State.Connected;
m_activeReader = null;
m_activeCommand = null;
}
}

Expand Down Expand Up @@ -634,6 +679,7 @@ private enum State
Socket m_socket;
NetworkStream m_networkStream;
IPayloadHandler m_payloadHandler;
MySqlCommand m_activeCommand;
MySqlDataReader m_activeReader;
}
}
Loading

0 comments on commit 3e4d8f5

Please sign in to comment.