Skip to content

Commit

Permalink
asyncIO option
Browse files Browse the repository at this point in the history
  • Loading branch information
caleblloyd committed Sep 26, 2016
1 parent 5d63eb6 commit 57f94c3
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 110 deletions.
12 changes: 6 additions & 6 deletions src/MySqlConnector/MySqlClient/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace MySql.Data.MySqlClient
internal sealed class ConnectionPool
{

public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationToken)
public async Task<MySqlSession> GetSessionAsync(bool asyncIO, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -26,25 +26,25 @@ public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationTo
// check for a pooled session
if (m_sessions.TryDequeue(out session))
{
if (!await session.TryPingAsync(cancellationToken).ConfigureAwait(false))
if (!await session.TryPingAsync(asyncIO, cancellationToken).ConfigureAwait(false))
{
// session is not valid
await session.DisposeAsync(cancellationToken).ConfigureAwait(false);
await session.DisposeAsync(asyncIO, cancellationToken).ConfigureAwait(false);
}
else
{
// session is valid, reset if supported
if (m_resetConnections)
{
await session.ResetConnectionAsync(m_userId, m_password, m_database, cancellationToken).ConfigureAwait(false);
await session.ResetConnectionAsync(m_userId, m_password, m_database, asyncIO, cancellationToken).ConfigureAwait(false);
}
// pooled session is ready to be used; return it
return session;
}
}

session = new MySqlSession(this);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, cancellationToken).ConfigureAwait(false);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, asyncIO, cancellationToken).ConfigureAwait(false);
return session;
}
catch
Expand Down Expand Up @@ -84,7 +84,7 @@ public async Task ClearAsync(CancellationToken cancellationToken)
MySqlSession session;
while (m_sessions.TryDequeue(out session))
{
tasks.Add(session.DisposeAsync(cancellationToken));
tasks.Add(session.DisposeAsync(false, cancellationToken));
}
if (tasks.Count > 0)
{
Expand Down
33 changes: 24 additions & 9 deletions src/MySqlConnector/MySqlClient/MySqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,38 +103,53 @@ protected override DbParameter CreateDbParameter()
}

protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
=> Utility.SynchronousResult(async () => await ExecuteDbDataReaderAsync(behavior, CancellationToken.None)
=> Utility.SynchronousResult(async () => await ExecuteDbDataReaderAsync(behavior, false, CancellationToken.None)
.ConfigureAwait(false));

public override async Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken)
{
using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(false))
return await ExecuteNonQueryAsync(true, cancellationToken);
}

private async Task<int> ExecuteNonQueryAsync(bool asyncIO, CancellationToken cancellationToken)
{
using (var reader = await ExecuteDbDataReaderAsync(CommandBehavior.Default, asyncIO, cancellationToken).ConfigureAwait(false) as MySqlDataReader)
{
do
{
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
while (await reader.ReadAsync(asyncIO, cancellationToken).ConfigureAwait(false))
{
}
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(asyncIO, cancellationToken).ConfigureAwait(false));
return reader.RecordsAffected;
}
}

public override async Task<object> ExecuteScalarAsync(CancellationToken cancellationToken)
{
return await ExecuteScalarAsync(true, cancellationToken);
}

private async Task<object> ExecuteScalarAsync(bool asyncIO, CancellationToken cancellationToken)
{
object result = null;
using (var reader = await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, cancellationToken).ConfigureAwait(false))
using (var reader = await ExecuteDbDataReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, asyncIO, cancellationToken).ConfigureAwait(false) as MySqlDataReader)
{
do
{
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
if (await reader.ReadAsync(asyncIO, cancellationToken).ConfigureAwait(false))
result = reader.GetValue(0);
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(asyncIO, cancellationToken).ConfigureAwait(false));
}
return result;
}

protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
{
return await ExecuteDbDataReaderAsync(behavior, true, cancellationToken);
}

private async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, bool asyncIO, CancellationToken cancellationToken)
{
VerifyValid();
Connection.HasActiveReader = true;
Expand All @@ -152,8 +167,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
var preparer = new MySqlStatementPreparer(CommandText, m_parameterCollection, statementPreparerOptions);
preparer.BindParameters();
var payload = new PayloadData(new ArraySegment<byte>(Payload.CreateEofStringPayload(CommandKind.Query, preparer.PreparedSql)));
await Session.SendAsync(payload, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, cancellationToken).ConfigureAwait(false);
await Session.SendAsync(payload, asyncIO, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, asyncIO, cancellationToken).ConfigureAwait(false);
return reader;
}
finally
Expand Down
18 changes: 11 additions & 7 deletions src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@ public override void ChangeDatabase(string databaseName)
throw new NotImplementedException();
}

public override void Open() => Utility.SynchronousResult(async () => await OpenAsync(CancellationToken.None)
.ConfigureAwait(false));
public override void Open() => Utility.SynchronousResult(async () => await OpenAsync(false, CancellationToken.None).ConfigureAwait(false), true);

public override async Task OpenAsync(CancellationToken cancellationToken)
{
await OpenAsync(true, cancellationToken);
}

private async Task OpenAsync(bool asyncIO, CancellationToken cancellationToken)
{
VerifyNotDisposed();
if (State != ConnectionState.Closed)
Expand All @@ -105,7 +109,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)

try
{
m_session = await CreateSessionAsync(cancellationToken).ConfigureAwait(false);
m_session = await CreateSessionAsync(asyncIO, cancellationToken).ConfigureAwait(false);

m_hasBeenOpened = true;
SetState(ConnectionState.Open);
Expand Down Expand Up @@ -206,20 +210,20 @@ internal MySqlSession Session
internal bool ConvertZeroDateTime => m_connectionStringBuilder.ConvertZeroDateTime;
internal bool OldGuids => m_connectionStringBuilder.OldGuids;

private async Task<MySqlSession> CreateSessionAsync(CancellationToken cancellationToken)
private async Task<MySqlSession> CreateSessionAsync(bool asyncIO, CancellationToken cancellationToken)
{
// get existing session from the pool if possible
if (m_connectionStringBuilder.Pooling)
{
var pool = ConnectionPool.GetPool(m_connectionStringBuilder);
// this returns an open session
return await pool.GetSessionAsync(cancellationToken).ConfigureAwait(false);
return await pool.GetSessionAsync(asyncIO, cancellationToken).ConfigureAwait(false);
}
else
{
var session = new MySqlSession(null);
await session.ConnectAsync(m_connectionStringBuilder.Server.Split(','), (int) m_connectionStringBuilder.Port, m_connectionStringBuilder.UserID,
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, cancellationToken).ConfigureAwait(false);
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, asyncIO, cancellationToken).ConfigureAwait(false);
return session;
}
}
Expand Down Expand Up @@ -254,7 +258,7 @@ private void DoClose()
if (m_connectionStringBuilder.Pooling)
m_session.ReturnToPool();
else
Utility.SynchronousResult(async () => await m_session.DisposeAsync(CancellationToken.None).ConfigureAwait(false));
Utility.SynchronousResult(async () => await m_session.DisposeAsync(false, CancellationToken.None).ConfigureAwait(false));
m_session = null;
}
SetState(ConnectionState.Closed);
Expand Down
30 changes: 20 additions & 10 deletions src/MySqlConnector/MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ public sealed class MySqlDataReader : DbDataReader
{
public override bool NextResult()
{
return Utility.SynchronousResult(async () => await NextResultAsync(CancellationToken.None).ConfigureAwait(false));
return Utility.SynchronousResult(async () => await NextResultAsync(false, CancellationToken.None).ConfigureAwait(false));
}

public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
{
return await NextResultAsync(true, cancellationToken);
}

public async Task<bool> NextResultAsync(bool asyncIO, CancellationToken cancellationToken)
{
VerifyNotDisposed();

Expand All @@ -31,17 +36,22 @@ public override async Task<bool> NextResultAsync(CancellationToken cancellationT
if (oldState != State.HasMoreData)
throw new InvalidOperationException("Invalid state: {0}".FormatInvariant(oldState));

await ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await ReadResultSetHeaderAsync(asyncIO, cancellationToken).ConfigureAwait(false);
return true;
}

public override bool Read()
{
VerifyNotDisposed();
return Utility.SynchronousResult(async () => await ReadAsync(CancellationToken.None).ConfigureAwait(false));
return Utility.SynchronousResult(async () => await ReadAsync(false, CancellationToken.None).ConfigureAwait(false));
}

public override Task<bool> ReadAsync(CancellationToken cancellationToken)
{
return ReadAsync(true, cancellationToken);
}

public Task<bool> ReadAsync(bool asyncIO, CancellationToken cancellationToken)
{
VerifyNotDisposed();

Expand All @@ -51,7 +61,7 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)

if (m_state != State.AlreadyReadFirstRow)
{
var payloadTask = m_session.ReceiveReplyAsync(cancellationToken);
var payloadTask = m_session.ReceiveReplyAsync(asyncIO, cancellationToken);
if (payloadTask.IsCompletedSuccessfully)
return ReadAsyncRemainder(payloadTask.Result) ? s_trueTask : s_falseTask;
return ReadAsyncAwaited(payloadTask.AsTask());
Expand Down Expand Up @@ -645,10 +655,10 @@ private void DoClose()
}
}

internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, CancellationToken cancellationToken)
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, bool asyncIO, CancellationToken cancellationToken)
{
var dataReader = new MySqlDataReader(command, behavior);
await dataReader.ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await dataReader.ReadResultSetHeaderAsync(asyncIO, cancellationToken).ConfigureAwait(false);
return dataReader;
}

Expand Down Expand Up @@ -709,11 +719,11 @@ private MySqlDataReader(MySqlCommand command, CommandBehavior behavior)

private MySqlConnection Connection => m_command.Connection;

private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)
private async Task ReadResultSetHeaderAsync(bool asyncIO, CancellationToken cancellationToken)
{
while (true)
{
var payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
var payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false);

var firstByte = payload.HeaderByte;
if (firstByte == OkPayload.Signature)
Expand All @@ -740,11 +750,11 @@ private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)

for (var column = 0; column < m_columnDefinitions.Length; column++)
{
payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false);
m_columnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
}

payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false);
EofPayload.Create(payload);

m_command.LastInsertedId = -1;
Expand Down
23 changes: 23 additions & 0 deletions src/MySqlConnector/PriorityScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MySql.Data
{
public class PriorityScheduler : TaskScheduler
{
protected override void QueueTask(Task task)
{
throw new System.NotImplementedException();
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
throw new System.NotImplementedException();
}

protected override IEnumerable<Task> GetScheduledTasks()
{
throw new System.NotImplementedException();
}
}
}
Loading

0 comments on commit 57f94c3

Please sign in to comment.