Skip to content

Commit

Permalink
Use synchronous I/O for sync methods. Fixes #62
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Sep 29, 2016
1 parent d65a3ae commit 9f5ea21
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 144 deletions.
18 changes: 9 additions & 9 deletions src/MySqlConnector/MySqlClient/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MySql.Data.MySqlClient
{
internal sealed class ConnectionPool
{
public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationToken)
public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -24,25 +24,25 @@ public async Task<MySqlSession> GetSessionAsync(CancellationToken cancellationTo
// check for a pooled session
if (m_sessions.TryDequeue(out session))
{
if (!await session.TryPingAsync(cancellationToken).ConfigureAwait(false))
if (!await session.TryPingAsync(ioBehavior, cancellationToken).ConfigureAwait(false))
{
// session is not valid
await session.DisposeAsync(cancellationToken).ConfigureAwait(false);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
else
{
// session is valid, reset if supported
if (m_resetConnections)
{
await session.ResetConnectionAsync(m_userId, m_password, m_database, cancellationToken).ConfigureAwait(false);
await session.ResetConnectionAsync(m_userId, m_password, m_database, ioBehavior, cancellationToken).ConfigureAwait(false);
}
// pooled session is ready to be used; return it
return session;
}
}

session = new MySqlSession(this);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, cancellationToken).ConfigureAwait(false);
await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, ioBehavior, cancellationToken).ConfigureAwait(false);
return session;
}
catch
Expand All @@ -64,7 +64,7 @@ public void Return(MySqlSession session)
}
}

public async Task ClearAsync(CancellationToken cancellationToken)
public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
try
Expand All @@ -82,7 +82,7 @@ public async Task ClearAsync(CancellationToken cancellationToken)
MySqlSession session;
while (m_sessions.TryDequeue(out session))
{
tasks.Add(session.DisposeAsync(cancellationToken));
tasks.Add(session.DisposeAsync(ioBehavior, cancellationToken));
}
if (tasks.Count > 0)
{
Expand Down Expand Up @@ -116,12 +116,12 @@ public static ConnectionPool GetPool(MySqlConnectionStringBuilder csb)
return pool;
}

public static async Task ClearPoolsAsync(CancellationToken cancellationToken)
public static async Task ClearPoolsAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var pools = new List<ConnectionPool>(s_pools.Values);

foreach (var pool in pools)
await pool.ClearAsync(cancellationToken).ConfigureAwait(false);
await pool.ClearAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}

private ConnectionPool(IEnumerable<string> servers, int port, string userId, string password, string database,
Expand Down
43 changes: 26 additions & 17 deletions src/MySqlConnector/MySqlClient/MySqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public override void Cancel()
throw new NotSupportedException("Use the Async overloads with a CancellationToken.");
}

public override int ExecuteNonQuery()
=> ExecuteNonQueryAsync(CancellationToken.None).GetAwaiter().GetResult();
public override int ExecuteNonQuery() =>
ExecuteNonQueryAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override object ExecuteScalar()
=> ExecuteScalarAsync(CancellationToken.None).GetAwaiter().GetResult();
public override object ExecuteScalar() =>
ExecuteScalarAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override void Prepare()
{
Expand Down Expand Up @@ -102,38 +102,47 @@ protected override DbParameter CreateDbParameter()
return new MySqlParameter();
}

protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
=> ExecuteDbDataReaderAsync(behavior, CancellationToken.None).GetAwaiter().GetResult();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) =>
ExecuteReaderAsync(behavior, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken)
public override Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken) =>
ExecuteNonQueryAsync(IOBehavior.Asynchronous, cancellationToken);

internal async Task<int> ExecuteNonQueryAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(false))
using (var reader = (MySqlDataReader) await ExecuteReaderAsync(CommandBehavior.Default, ioBehavior, cancellationToken).ConfigureAwait(false))
{
do
{
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
while (await reader.ReadAsync(ioBehavior, cancellationToken).ConfigureAwait(false))
{
}
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false));
return reader.RecordsAffected;
}
}

public override async Task<object> ExecuteScalarAsync(CancellationToken cancellationToken)
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken) =>
ExecuteScalarAsync(IOBehavior.Asynchronous, cancellationToken);

internal async Task<object> ExecuteScalarAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
object result = null;
using (var reader = await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, cancellationToken).ConfigureAwait(false))
using (var reader = (MySqlDataReader) await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, ioBehavior, cancellationToken).ConfigureAwait(false))
{
do
{
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
if (await reader.ReadAsync(ioBehavior, cancellationToken).ConfigureAwait(false))
result = reader.GetValue(0);
} while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false));
} while (await reader.NextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false));
}
return result;
}

protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) =>
ExecuteReaderAsync(behavior, IOBehavior.Asynchronous, cancellationToken);

internal async Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyValid();
Connection.HasActiveReader = true;
Expand All @@ -151,8 +160,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
var preparer = new MySqlStatementPreparer(CommandText, m_parameterCollection, statementPreparerOptions);
preparer.BindParameters();
var payload = new PayloadData(new ArraySegment<byte>(Payload.CreateEofStringPayload(CommandKind.Query, preparer.PreparedSql)));
await Session.SendAsync(payload, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, cancellationToken).ConfigureAwait(false);
await Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
reader = await MySqlDataReader.CreateAsync(this, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
return reader;
}
finally
Expand Down
42 changes: 23 additions & 19 deletions src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public MySqlConnection(string connectionString)
public new MySqlTransaction BeginTransaction() => (MySqlTransaction) base.BeginTransaction();

public Task<MySqlTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) =>
BeginDbTransactionAsync(IsolationLevel.Unspecified, cancellationToken);
BeginDbTransactionAsync(IsolationLevel.Unspecified, IOBehavior.Asynchronous, cancellationToken);

public Task<MySqlTransaction> BeginTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken)) =>
BeginDbTransactionAsync(isolationLevel, cancellationToken);
BeginDbTransactionAsync(isolationLevel, IOBehavior.Asynchronous, cancellationToken);

protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) =>
BeginDbTransactionAsync(isolationLevel).GetAwaiter().GetResult();
BeginDbTransactionAsync(isolationLevel, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

private async Task<MySqlTransaction> BeginDbTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken))
private async Task<MySqlTransaction> BeginDbTransactionAsync(IsolationLevel isolationLevel, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (State != ConnectionState.Open)
throw new InvalidOperationException("Connection is not open.");
Expand Down Expand Up @@ -67,7 +67,7 @@ protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLeve
}

using (var cmd = new MySqlCommand("set session transaction isolation level " + isolationLevelValue + "; start transaction;", this))
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
await cmd.ExecuteNonQueryAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

var transaction = new MySqlTransaction(this, isolationLevel);
CurrentTransaction = transaction;
Expand All @@ -88,9 +88,12 @@ public override void ChangeDatabase(string databaseName)
throw new NotImplementedException();
}

public override void Open() => OpenAsync(CancellationToken.None).GetAwaiter().GetResult();
public override void Open() => OpenAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task OpenAsync(CancellationToken cancellationToken)
public override Task OpenAsync(CancellationToken cancellationToken) =>
OpenAsync(IOBehavior.Asynchronous, cancellationToken);

private async Task OpenAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyNotDisposed();
if (State != ConnectionState.Closed)
Expand All @@ -104,7 +107,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)

try
{
m_session = await CreateSessionAsync(cancellationToken).ConfigureAwait(false);
m_session = await CreateSessionAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

m_hasBeenOpened = true;
SetState(ConnectionState.Open);
Expand Down Expand Up @@ -147,20 +150,21 @@ public override string ConnectionString

public override string ServerVersion => m_session.ServerVersion.OriginalString;

public static void ClearPool(MySqlConnection connection) => ClearPoolAsync(connection, CancellationToken.None).GetAwaiter().GetResult();
public static void ClearAllPools() => ClearAllPoolsAsync(CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearPoolAsync(MySqlConnection connection) => ClearPoolAsync(connection, CancellationToken.None);
public static Task ClearAllPoolsAsync() => ClearAllPoolsAsync(CancellationToken.None);
public static Task ClearAllPoolsAsync(CancellationToken cancellationToken) => ConnectionPool.ClearPoolsAsync(cancellationToken);
public static void ClearPool(MySqlConnection connection) => ClearPoolAsync(connection, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearPoolAsync(MySqlConnection connection) => ClearPoolAsync(connection, IOBehavior.Asynchronous, CancellationToken.None);
public static Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken) => ClearPoolAsync(connection, IOBehavior.Asynchronous, cancellationToken);
public static void ClearAllPools() => ConnectionPool.ClearPoolsAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public static Task ClearAllPoolsAsync() => ConnectionPool.ClearPoolsAsync(IOBehavior.Asynchronous, CancellationToken.None);
public static Task ClearAllPoolsAsync(CancellationToken cancellationToken) => ConnectionPool.ClearPoolsAsync(IOBehavior.Asynchronous, cancellationToken);

public static async Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken)
private static async Task ClearPoolAsync(MySqlConnection connection, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (connection == null)
throw new ArgumentNullException(nameof(connection));

var pool = ConnectionPool.GetPool(connection.m_connectionStringBuilder);
if (pool != null)
await pool.ClearAsync(cancellationToken).ConfigureAwait(false);
await pool.ClearAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}

protected override DbCommand CreateDbCommand() => new MySqlCommand(this, CurrentTransaction);
Expand Down Expand Up @@ -203,7 +207,7 @@ internal MySqlSession Session
internal bool ConvertZeroDateTime => m_connectionStringBuilder.ConvertZeroDateTime;
internal bool OldGuids => m_connectionStringBuilder.OldGuids;

private async Task<MySqlSession> CreateSessionAsync(CancellationToken cancellationToken)
private async Task<MySqlSession> CreateSessionAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var connectTimeout = m_connectionStringBuilder.ConnectionTimeout == 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(checked((int) m_connectionStringBuilder.ConnectionTimeout));
using (var timeoutSource = new CancellationTokenSource(connectTimeout))
Expand All @@ -217,13 +221,13 @@ private async Task<MySqlSession> CreateSessionAsync(CancellationToken cancellati
var pool = ConnectionPool.GetPool(m_connectionStringBuilder);

// this returns an open session
return await pool.GetSessionAsync(linkedSource.Token).ConfigureAwait(false);
return await pool.GetSessionAsync(ioBehavior, linkedSource.Token).ConfigureAwait(false);
}
else
{
var session = new MySqlSession(null);
await session.ConnectAsync(m_connectionStringBuilder.Server.Split(','), (int) m_connectionStringBuilder.Port, m_connectionStringBuilder.UserID,
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, linkedSource.Token).ConfigureAwait(false);
m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, ioBehavior, linkedSource.Token).ConfigureAwait(false);
return session;
}
}
Expand Down Expand Up @@ -264,7 +268,7 @@ private void DoClose()
if (m_connectionStringBuilder.Pooling)
m_session.ReturnToPool();
else
m_session.DisposeAsync(CancellationToken.None).GetAwaiter().GetResult();
m_session.DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
m_session = null;
}
SetState(ConnectionState.Closed);
Expand Down
36 changes: 20 additions & 16 deletions src/MySqlConnector/MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ namespace MySql.Data.MySqlClient
{
public sealed class MySqlDataReader : DbDataReader
{
public override bool NextResult()
{
return NextResultAsync(CancellationToken.None).GetAwaiter().GetResult();
}
public override bool NextResult() =>
NextResultAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
public override Task<bool> NextResultAsync(CancellationToken cancellationToken) =>
NextResultAsync(IOBehavior.Asynchronous, cancellationToken);

internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyNotDisposed();

while (m_state == State.ReadingRows || m_state == State.ReadResultSetHeader)
await ReadAsync(cancellationToken).ConfigureAwait(false);
await ReadAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

var oldState = m_state;
Reset();
Expand All @@ -31,17 +32,20 @@ public override async Task<bool> NextResultAsync(CancellationToken cancellationT
if (oldState != State.HasMoreData)
throw new InvalidOperationException("Invalid state: {0}".FormatInvariant(oldState));

await ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await ReadResultSetHeaderAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return true;
}

public override bool Read()
{
VerifyNotDisposed();
return ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
return ReadAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}

public override Task<bool> ReadAsync(CancellationToken cancellationToken)
public override Task<bool> ReadAsync(CancellationToken cancellationToken) =>
ReadAsync(IOBehavior.Asynchronous, cancellationToken);

internal Task<bool> ReadAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyNotDisposed();

Expand All @@ -51,7 +55,7 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)

if (m_state != State.AlreadyReadFirstRow)
{
var payloadTask = m_session.ReceiveReplyAsync(cancellationToken);
var payloadTask = m_session.ReceiveReplyAsync(ioBehavior, cancellationToken);
if (payloadTask.IsCompletedSuccessfully)
return ReadAsyncRemainder(payloadTask.Result) ? s_trueTask : s_falseTask;
return ReadAsyncAwaited(payloadTask.AsTask());
Expand Down Expand Up @@ -645,10 +649,10 @@ private void DoClose()
}
}

internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, CancellationToken cancellationToken)
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var dataReader = new MySqlDataReader(command, behavior);
await dataReader.ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false);
await dataReader.ReadResultSetHeaderAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return dataReader;
}

Expand Down Expand Up @@ -709,11 +713,11 @@ private MySqlDataReader(MySqlCommand command, CommandBehavior behavior)

private MySqlConnection Connection => m_command.Connection;

private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)
private async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
while (true)
{
var payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
var payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

var firstByte = payload.HeaderByte;
if (firstByte == OkPayload.Signature)
Expand All @@ -740,11 +744,11 @@ private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken)

for (var column = 0; column < m_columnDefinitions.Length; column++)
{
payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
m_columnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
}

payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false);
payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
EofPayload.Create(payload);

m_command.LastInsertedId = -1;
Expand Down
Loading

4 comments on commit 9f5ea21

@caleblloyd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a Proof of Concept that implements DNS Cache for NETSTANDARD1_3: caleblloyd@c647af5

It feels hacky, but that's why it's limited to SyncIO on NETSTANDARD1_3. It's the only way I could think of that did not involve backporting the resolver from NETSTANDARD2.

Let me know if you think this is a good idea to go forward with, and I can clean it up and open a PR to the sync_io branch.

@bgrainger
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll try to take a look at this later tonight.

@caleblloyd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, the more I dislike the DNS Cache option. Resolving hosts is a hard, multi-tier thing and making our own assumptions about caching could break other guarantees of the resolver (TTLs, round-robin responses, etc.)

I think the best approach is to implement min pool size in another commit/issue and suggest Connection Pool Warming for those who need high concurrency synchronous connections. We should probably start a "Best Practices" section in the README.md or on a wiki page:

  1. Use Async all the way through
  2. If you must use Synchronous connections on .NET Core, ensure that you warm your connection pool by setting min pool size in the connection string and calling db.Connect(); db.Close(); before accepting any requests

That being said, I think the sync_io branch looks great except we should probably add syncIO logic to the SemaphoreSlim calls in ConnectionPool.GetSessionAsync (example here)

I'm getting consistently good Sync performance on a warmed connection pool. Connection pool can be warmed by running Async performance tests after a cold start, then running Sync performance tests.

@bgrainger
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I think this is best addressed as a "known issue" for now, with workarounds of using OpenAsync if possible and using a bigger minimum pool size (once that's implemented). Long term, netstandard-2.0 may provide more options.

I want to add a few more tests (to verify the sync path) but after that I think it's worth merging to improve the library for sync callers.

Please sign in to comment.