Skip to content

Commit

Permalink
Use synchronous I/O for sync methods. Fixes #62
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Sep 27, 2016
1 parent 8c04e36 commit dc63b88
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 130 deletions.
18 changes: 18 additions & 0 deletions src/MySqlConnector/IOKind.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace MySql.Data
{
/// <summary>
/// Specifies whether to perform synchronous or asynchronous I/O.
/// </summary>
internal enum IOKind
{
/// <summary>
/// Use synchronous I/O.
/// </summary>
Synchronous,

/// <summary>
/// Use asynchronous I/O.
/// </summary>
Asynchronous,
}
}
22 changes: 10 additions & 12 deletions src/MySqlConnector/MySqlClient/ConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,8 +8,7 @@ namespace MySql.Data.MySqlClient
{
internal sealed class ConnectionPool
{

public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationToken)
public async Task<MySqlSession> GetSessionAsync(IOKind ioKind, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -26,25 +24,25 @@ public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationTo
// check for a pooled session
if (m_sessions.TryDequeue(out session))
{
if (!await session.TryPingAsync(cancellationToken).ConfigureAwait(false))
if (!await session.TryPingAsync(ioKind, cancellationToken).ConfigureAwait(false))
{
// session is not valid
await session.DisposeAsync(cancellationToken).ConfigureAwait(false);
await session.DisposeAsync(ioKind, cancellationToken).ConfigureAwait(false);
}
else
{
// session is valid, reset if supported
if (m_resetConnections)
{
await session.ResetConnectionAsync(m_userId, m_password, m_database, cancellationToken).ConfigureAwait(false);
await session.ResetConnectionAsync(m_userId, m_password, m_database, ioKind, cancellationToken).ConfigureAwait(false);
}
// pooled session is ready to be used; return it
return session;
}
}

session = new MySqlSession(this);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, cancellationToken).ConfigureAwait(false);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, ioKind, cancellationToken).ConfigureAwait(false);
return session;
}
catch
Expand All @@ -66,7 +64,7 @@ public void Return(MySqlSession session)
}
}

public async Task ClearAsync(CancellationToken cancellationToken)
public async Task ClearAsync(IOKind ioKind, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
try
Expand All @@ -84,7 +82,7 @@ public async Task ClearAsync(CancellationToken cancellationToken)
MySqlSession session;
while (m_sessions.TryDequeue(out session))
{
tasks.Add(session.DisposeAsync(cancellationToken));
tasks.Add(session.DisposeAsync(ioKind, cancellationToken));
}
if (tasks.Count > 0)
{
Expand Down Expand Up @@ -118,12 +116,12 @@ public static ConnectionPool GetPool(MySqlConnectionStringBuilder csb)
return pool;
}

public static async Task ClearPoolsAsync(CancellationToken cancellationToken)
public static async Task ClearPoolsAsync(IOKind ioKind, CancellationToken cancellationToken)
{
var pools = new List<ConnectionPool>(s_pools.Values);

foreach (var pool in pools)
await pool.ClearAsync(cancellationToken).ConfigureAwait(false);
await pool.ClearAsync(ioKind, cancellationToken).ConfigureAwait(false);
}

private ConnectionPool(IEnumerable<string> servers, int port, string userId, string password, string database, int connectionTimeout,
Expand Down
37 changes: 23 additions & 14 deletions src/MySqlConnector/MySqlClient/MySqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public override void Cancel()
}

public override int ExecuteNonQuery()
=> ExecuteNonQueryAsync(CancellationToken.None).GetAwaiter().GetResult();
=> ExecuteNonQueryAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override object ExecuteScalar()
=> ExecuteScalarAsync(CancellationToken.None).GetAwaiter().GetResult();
=> ExecuteScalarAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override void Prepare()
{
Expand Down Expand Up @@ -103,37 +103,46 @@ protected override DbParameter CreateDbParameter()
}

protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
=> ExecuteDbDataReaderAsync(behavior, CancellationToken.None).GetAwaiter().GetResult();
=> ExecuteReaderAsync(behavior, IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken)
public override Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken) =>
ExecuteNonQueryAsync(IOKind.Asynchronous, cancellationToken);

internal async Task<int> ExecuteNonQueryAsync(IOKind ioKind, CancellationToken cancellationToken)
{
using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(false))
using (var reader = (MySqlDataReader) await ExecuteReaderAsync(CommandBehavior.Default, ioKind, cancellationToken).ConfigureAwait(false))
{
do
{
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
while (await reader.ReadAsync(ioKind, cancellationToken).ConfigureAwait(false))
{
}
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(ioKind, cancellationToken).ConfigureAwait(false));
return reader.RecordsAffected;
}
}

public override async Task<object> ExecuteScalarAsync(CancellationToken cancellationToken)
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken) =>
ExecuteScalarAsync(IOKind.Asynchronous, cancellationToken);

internal async Task<object> ExecuteScalarAsync(IOKind ioKind, CancellationToken cancellationToken)
{
object result = null;
using (var reader = await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, cancellationToken).ConfigureAwait(false))
using (var reader = (MySqlDataReader) await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, ioKind, cancellationToken).ConfigureAwait(false))
{
do
{
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
if (await reader.ReadAsync(ioKind, cancellationToken).ConfigureAwait(false))
result = reader.GetValue(0);
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(ioKind, cancellationToken).ConfigureAwait(false));
}
return result;
}

protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) =>
ExecuteReaderAsync(behavior, IOKind.Asynchronous, cancellationToken);

internal async Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOKind ioKind, CancellationToken cancellationToken)
{
VerifyValid();
Connection.HasActiveReader = true;
Expand All @@ -151,8 +160,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
var preparer = new MySqlStatementPreparer(CommandText, m_parameterCollection, statementPreparerOptions);
preparer.BindParameters();
var payload = new PayloadData(new ArraySegment<byte>(Payload.CreateEofStringPayload(CommandKind.Query, preparer.PreparedSql)));
await Session.SendAsync(payload, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, cancellationToken).ConfigureAwait(false);
await Session.SendAsync(payload, ioKind, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, ioKind, cancellationToken).ConfigureAwait(false);
return reader;
}
finally
Expand Down
42 changes: 23 additions & 19 deletions src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public MySqlConnection(string connectionString)
public new MySqlTransaction BeginTransaction() => (MySqlTransaction) base.BeginTransaction();

public Task<MySqlTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) =>
BeginDbTransactionAsync(IsolationLevel.Unspecified, cancellationToken);
BeginDbTransactionAsync(IsolationLevel.Unspecified, IOKind.Asynchronous, cancellationToken);

public Task<MySqlTransaction> BeginTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken)) =>
BeginDbTransactionAsync(isolationLevel, cancellationToken);
BeginDbTransactionAsync(isolationLevel, IOKind.Asynchronous, cancellationToken);

protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) =>
BeginDbTransactionAsync(isolationLevel).GetAwaiter().GetResult();
BeginDbTransactionAsync(isolationLevel, IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

private async Task<MySqlTransaction> BeginDbTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken))
private async Task<MySqlTransaction> BeginDbTransactionAsync(IsolationLevel isolationLevel, IOKind ioKind, CancellationToken cancellationToken)
{
if (State != ConnectionState.Open)
throw new InvalidOperationException("Connection is not open.");
Expand Down Expand Up @@ -67,7 +67,7 @@ protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLeve
}

using (var cmd = new MySqlCommand("set session transaction isolation level " + isolationLevelValue + "; start transaction;", this))
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
await cmd.ExecuteNonQueryAsync(ioKind, cancellationToken).ConfigureAwait(false);

var transaction = new MySqlTransaction(this, isolationLevel);
CurrentTransaction = transaction;
Expand All @@ -88,9 +88,12 @@ public override void ChangeDatabase(string databaseName)
throw new NotImplementedException();
}

public override void Open() => OpenAsync(CancellationToken.None).GetAwaiter().GetResult();
public override void Open() => OpenAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task OpenAsync(CancellationToken cancellationToken)
public override Task OpenAsync(CancellationToken cancellationToken) =>
OpenAsync(IOKind.Asynchronous, cancellationToken);

private async Task OpenAsync(IOKind ioKind, CancellationToken cancellationToken)
{
VerifyNotDisposed();
if (State != ConnectionState.Closed)
Expand All @@ -104,7 +107,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)

try
{
m_session = await CreateSessionAsync(cancellationToken).ConfigureAwait(false);
m_session = await CreateSessionAsync(ioKind, cancellationToken).ConfigureAwait(false);

m_hasBeenOpened = true;
SetState(ConnectionState.Open);
Expand Down Expand Up @@ -147,20 +150,21 @@ public override string ConnectionString

public override string ServerVersion => m_session.ServerVersion.OriginalString;

public static void ClearPool(MySqlConnection connection) => ClearPoolAsync(connection, CancellationToken.None).GetAwaiter().GetResult();
public static void ClearAllPools() => ClearAllPoolsAsync(CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearPoolAsync(MySqlConnection connection) => ClearPoolAsync(connection, CancellationToken.None);
public static Task ClearAllPoolsAsync() => ClearAllPoolsAsync(CancellationToken.None);
public static Task ClearAllPoolsAsync(CancellationToken cancellationToken) => ConnectionPool.ClearPoolsAsync(cancellationToken);
public static void ClearPool(MySqlConnection connection) => ClearPoolAsync(connection, IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearPoolAsync(MySqlConnection connection) => ClearPoolAsync(connection, IOKind.Asynchronous, CancellationToken.None);
public static Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken) => ClearPoolAsync(connection, IOKind.Asynchronous, cancellationToken);
public static void ClearAllPools() => ConnectionPool.ClearPoolsAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearAllPoolsAsync() => ConnectionPool.ClearPoolsAsync(IOKind.Asynchronous, CancellationToken.None);
public static Task ClearAllPoolsAsync(CancellationToken cancellationToken) => ConnectionPool.ClearPoolsAsync(IOKind.Asynchronous, cancellationToken);

public static async Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken)
private static async Task ClearPoolAsync(MySqlConnection connection, IOKind ioKind, CancellationToken cancellationToken)
{
if (connection == null)
throw new ArgumentNullException(nameof(connection));

var pool = ConnectionPool.GetPool(connection.m_connectionStringBuilder);
if (pool != null)
await pool.ClearAsync(cancellationToken).ConfigureAwait(false);
await pool.ClearAsync(ioKind, cancellationToken).ConfigureAwait(false);
}

protected override DbCommand CreateDbCommand() => new MySqlCommand(this, CurrentTransaction);
Expand Down Expand Up @@ -203,20 +207,20 @@ internal MySqlSession Session
internal bool ConvertZeroDateTime => m_connectionStringBuilder.ConvertZeroDateTime;
internal bool OldGuids => m_connectionStringBuilder.OldGuids;

private async Task<MySqlSession> CreateSessionAsync(CancellationToken cancellationToken)
private async Task<MySqlSession> CreateSessionAsync(IOKind ioKind, CancellationToken cancellationToken)
{
// get existing session from the pool if possible
if (m_connectionStringBuilder.Pooling)
{
var pool = ConnectionPool.GetPool(m_connectionStringBuilder);
// this returns an open session
return await pool.GetSessionAsync(cancellationToken).ConfigureAwait(false);
return await pool.GetSessionAsync(ioKind, cancellationToken).ConfigureAwait(false);
}
else
{
var session = new MySqlSession(null);
await session.ConnectAsync(m_connectionStringBuilder.Server.Split(','), (int) m_connectionStringBuilder.Port, m_connectionStringBuilder.UserID,
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, cancellationToken).ConfigureAwait(false);
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, ioKind, cancellationToken).ConfigureAwait(false);
return session;
}
}
Expand Down Expand Up @@ -251,7 +255,7 @@ private void DoClose()
if (m_connectionStringBuilder.Pooling)
m_session.ReturnToPool();
else
m_session.DisposeAsync(CancellationToken.None).GetAwaiter().GetResult();
m_session.DisposeAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
m_session = null;
}
SetState(ConnectionState.Closed);
Expand Down
36 changes: 20 additions & 16 deletions src/MySqlConnector/MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ namespace MySql.Data.MySqlClient
{
public sealed class MySqlDataReader : DbDataReader
{
public override bool NextResult()
{
return NextResultAsync(CancellationToken.None).GetAwaiter().GetResult();
}
public override bool NextResult() =>
NextResultAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
public override Task<bool> NextResultAsync(CancellationToken cancellationToken) =>
NextResultAsync(IOKind.Asynchronous, cancellationToken);

internal async Task<bool> NextResultAsync(IOKind ioKind, CancellationToken cancellationToken)
{
VerifyNotDisposed();

while (m_state == State.ReadingRows || m_state == State.ReadResultSetHeader)
await ReadAsync(cancellationToken).ConfigureAwait(false);
await ReadAsync(ioKind, cancellationToken).ConfigureAwait(false);

var oldState = m_state;
Reset();
Expand All @@ -31,17 +32,20 @@ public override async Task<bool> NextResultAsync(CancellationToken cancellationT
if (oldState != State.HasMoreData)
throw new InvalidOperationException("Invalid state: {0}".FormatInvariant(oldState));

await ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await ReadResultSetHeaderAsync(ioKind, cancellationToken).ConfigureAwait(false);
return true;
}

public override bool Read()
{
VerifyNotDisposed();
return ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
return ReadAsync(IOKind.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}

public override Task<bool> ReadAsync(CancellationToken cancellationToken)
public override Task<bool> ReadAsync(CancellationToken cancellationToken) =>
ReadAsync(IOKind.Asynchronous, cancellationToken);

internal Task<bool> ReadAsync(IOKind ioKind, CancellationToken cancellationToken)
{
VerifyNotDisposed();

Expand All @@ -51,7 +55,7 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)

if (m_state != State.AlreadyReadFirstRow)
{
var payloadTask = m_session.ReceiveReplyAsync(cancellationToken);
var payloadTask = m_session.ReceiveReplyAsync(ioKind, cancellationToken);
if (payloadTask.IsCompletedSuccessfully)
return ReadAsyncRemainder(payloadTask.Result) ? s_trueTask : s_falseTask;
return ReadAsyncAwaited(payloadTask.AsTask());
Expand Down Expand Up @@ -645,10 +649,10 @@ private void DoClose()
}
}

internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, CancellationToken cancellationToken)
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, IOKind ioKind, CancellationToken cancellationToken)
{
var dataReader = new MySqlDataReader(command, behavior);
await dataReader.ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await dataReader.ReadResultSetHeaderAsync(ioKind, cancellationToken).ConfigureAwait(false);
return dataReader;
}

Expand Down Expand Up @@ -709,11 +713,11 @@ private MySqlDataReader(MySqlCommand command, CommandBehavior behavior)

private MySqlConnection Connection => m_command.Connection;

private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)
private async Task ReadResultSetHeaderAsync(IOKind ioKind, CancellationToken cancellationToken)
{
while (true)
{
var payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
var payload = await m_session.ReceiveReplyAsync(ioKind, cancellationToken).ConfigureAwait(false);

var firstByte = payload.HeaderByte;
if (firstByte == OkPayload.Signature)
Expand All @@ -740,11 +744,11 @@ private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)

for (var column = 0; column < m_columnDefinitions.Length; column++)
{
payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(ioKind, cancellationToken).ConfigureAwait(false);
m_columnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
}

payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(ioKind, cancellationToken).ConfigureAwait(false);
EofPayload.Create(payload);

m_command.LastInsertedId = -1;
Expand Down
Loading

0 comments on commit dc63b88

Please sign in to comment.