diff --git a/src/MySqlConnector/MySqlClient/ConnectionPool.cs b/src/MySqlConnector/MySqlClient/ConnectionPool.cs index 1c5a38e99..11fa9236d 100644 --- a/src/MySqlConnector/MySqlClient/ConnectionPool.cs +++ b/src/MySqlConnector/MySqlClient/ConnectionPool.cs @@ -10,7 +10,7 @@ namespace MySql.Data.MySqlClient internal sealed class ConnectionPool { - public async Task GetSessionAsync(CancellationToken cancellationToken) + public async Task GetSessionAsync(bool asyncIO, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -26,17 +26,17 @@ public async Task GetSessionAsync(CancellationToken cancellationTo // check for a pooled session if (m_sessions.TryDequeue(out session)) { - if (!await session.TryPingAsync(cancellationToken).ConfigureAwait(false)) + if (!await session.TryPingAsync(asyncIO, cancellationToken).ConfigureAwait(false)) { // session is not valid - await session.DisposeAsync(cancellationToken).ConfigureAwait(false); + await session.DisposeAsync(asyncIO, cancellationToken).ConfigureAwait(false); } else { // session is valid, reset if supported if (m_resetConnections) { - await session.ResetConnectionAsync(m_userId, m_password, m_database, cancellationToken).ConfigureAwait(false); + await session.ResetConnectionAsync(m_userId, m_password, m_database, asyncIO, cancellationToken).ConfigureAwait(false); } // pooled session is ready to be used; return it return session; @@ -44,7 +44,7 @@ public async Task GetSessionAsync(CancellationToken cancellationTo } session = new MySqlSession(this); - await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, cancellationToken).ConfigureAwait(false); + await session.ConnectAsync(m_servers, m_port, m_userId, m_password, m_database, m_connectionTimeout, asyncIO, cancellationToken).ConfigureAwait(false); return session; } catch @@ -84,7 +84,7 @@ public async Task ClearAsync(CancellationToken cancellationToken) MySqlSession session; while (m_sessions.TryDequeue(out session)) { - tasks.Add(session.DisposeAsync(cancellationToken)); + tasks.Add(session.DisposeAsync(false, cancellationToken)); } if (tasks.Count > 0) { diff --git a/src/MySqlConnector/MySqlClient/MySqlCommand.cs b/src/MySqlConnector/MySqlClient/MySqlCommand.cs index 2de01ee5b..8524e01e2 100644 --- a/src/MySqlConnector/MySqlClient/MySqlCommand.cs +++ b/src/MySqlConnector/MySqlClient/MySqlCommand.cs @@ -103,38 +103,53 @@ protected override DbParameter CreateDbParameter() } protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) - => Utility.SynchronousResult(async () => await ExecuteDbDataReaderAsync(behavior, CancellationToken.None) + => Utility.SynchronousResult(async () => await ExecuteDbDataReaderAsync(behavior, false, CancellationToken.None) .ConfigureAwait(false)); public override async Task ExecuteNonQueryAsync(CancellationToken cancellationToken) { - using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(false)) + return await ExecuteNonQueryAsync(true, cancellationToken); + } + + private async Task ExecuteNonQueryAsync(bool asyncIO, CancellationToken cancellationToken) + { + using (var reader = await ExecuteDbDataReaderAsync(CommandBehavior.Default, asyncIO, cancellationToken).ConfigureAwait(false) as MySqlDataReader) { do { - while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + while (await reader.ReadAsync(asyncIO, cancellationToken).ConfigureAwait(false)) { } - } while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false)); + } while (await reader.NextResultAsync(asyncIO, cancellationToken).ConfigureAwait(false)); return reader.RecordsAffected; } } public override async Task ExecuteScalarAsync(CancellationToken cancellationToken) + { + return await ExecuteScalarAsync(true, cancellationToken); + } + + private async Task ExecuteScalarAsync(bool asyncIO, CancellationToken cancellationToken) { object result = null; - using (var reader = await ExecuteReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, cancellationToken).ConfigureAwait(false)) + using (var reader = await ExecuteDbDataReaderAsync(CommandBehavior.SingleResult | CommandBehavior.SingleRow, asyncIO, cancellationToken).ConfigureAwait(false) as MySqlDataReader) { do { - if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + if (await reader.ReadAsync(asyncIO, cancellationToken).ConfigureAwait(false)) result = reader.GetValue(0); - } while (await reader.NextResultAsync(cancellationToken).ConfigureAwait(false)); + } while (await reader.NextResultAsync(asyncIO, cancellationToken).ConfigureAwait(false)); } return result; } protected override async Task ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) + { + return await ExecuteDbDataReaderAsync(behavior, true, cancellationToken); + } + + private async Task ExecuteDbDataReaderAsync(CommandBehavior behavior, bool asyncIO, CancellationToken cancellationToken) { VerifyValid(); Connection.HasActiveReader = true; @@ -152,8 +167,8 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha var preparer = new MySqlStatementPreparer(CommandText, m_parameterCollection, statementPreparerOptions); preparer.BindParameters(); var payload = new PayloadData(new ArraySegment(Payload.CreateEofStringPayload(CommandKind.Query, preparer.PreparedSql))); - await Session.SendAsync(payload, cancellationToken).ConfigureAwait(false); - reader = await MySqlDataReader.CreateAsync(this, behavior, cancellationToken).ConfigureAwait(false); + await Session.SendAsync(payload, asyncIO, cancellationToken).ConfigureAwait(false); + reader = await MySqlDataReader.CreateAsync(this, behavior, asyncIO, cancellationToken).ConfigureAwait(false); return reader; } finally diff --git a/src/MySqlConnector/MySqlClient/MySqlConnection.cs b/src/MySqlConnector/MySqlClient/MySqlConnection.cs index 9fd5574ce..d5796717e 100644 --- a/src/MySqlConnector/MySqlClient/MySqlConnection.cs +++ b/src/MySqlConnector/MySqlClient/MySqlConnection.cs @@ -88,10 +88,14 @@ public override void ChangeDatabase(string databaseName) throw new NotImplementedException(); } - public override void Open() => Utility.SynchronousResult(async () => await OpenAsync(CancellationToken.None) - .ConfigureAwait(false)); + public override void Open() => Utility.SynchronousResult(async () => await OpenAsync(false, CancellationToken.None).ConfigureAwait(false), true); public override async Task OpenAsync(CancellationToken cancellationToken) + { + await OpenAsync(true, cancellationToken); + } + + private async Task OpenAsync(bool asyncIO, CancellationToken cancellationToken) { VerifyNotDisposed(); if (State != ConnectionState.Closed) @@ -105,7 +109,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken) try { - m_session = await CreateSessionAsync(cancellationToken).ConfigureAwait(false); + m_session = await CreateSessionAsync(asyncIO, cancellationToken).ConfigureAwait(false); m_hasBeenOpened = true; SetState(ConnectionState.Open); @@ -206,20 +210,20 @@ internal MySqlSession Session internal bool ConvertZeroDateTime => m_connectionStringBuilder.ConvertZeroDateTime; internal bool OldGuids => m_connectionStringBuilder.OldGuids; - private async Task CreateSessionAsync(CancellationToken cancellationToken) + private async Task CreateSessionAsync(bool asyncIO, CancellationToken cancellationToken) { // get existing session from the pool if possible if (m_connectionStringBuilder.Pooling) { var pool = ConnectionPool.GetPool(m_connectionStringBuilder); // this returns an open session - return await pool.GetSessionAsync(cancellationToken).ConfigureAwait(false); + return await pool.GetSessionAsync(asyncIO, cancellationToken).ConfigureAwait(false); } else { var session = new MySqlSession(null); await session.ConnectAsync(m_connectionStringBuilder.Server.Split(','), (int) m_connectionStringBuilder.Port, m_connectionStringBuilder.UserID, - m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, cancellationToken).ConfigureAwait(false); + m_connectionStringBuilder.Password, m_connectionStringBuilder.Database, (int) m_connectionStringBuilder.ConnectionTimeout, asyncIO, cancellationToken).ConfigureAwait(false); return session; } } @@ -254,7 +258,7 @@ private void DoClose() if (m_connectionStringBuilder.Pooling) m_session.ReturnToPool(); else - Utility.SynchronousResult(async () => await m_session.DisposeAsync(CancellationToken.None).ConfigureAwait(false)); + Utility.SynchronousResult(async () => await m_session.DisposeAsync(false, CancellationToken.None).ConfigureAwait(false)); m_session = null; } SetState(ConnectionState.Closed); diff --git a/src/MySqlConnector/MySqlClient/MySqlDataReader.cs b/src/MySqlConnector/MySqlClient/MySqlDataReader.cs index a6734eb83..6a55ceaba 100644 --- a/src/MySqlConnector/MySqlClient/MySqlDataReader.cs +++ b/src/MySqlConnector/MySqlClient/MySqlDataReader.cs @@ -14,10 +14,15 @@ public sealed class MySqlDataReader : DbDataReader { public override bool NextResult() { - return Utility.SynchronousResult(async () => await NextResultAsync(CancellationToken.None).ConfigureAwait(false)); + return Utility.SynchronousResult(async () => await NextResultAsync(false, CancellationToken.None).ConfigureAwait(false)); } public override async Task NextResultAsync(CancellationToken cancellationToken) + { + return await NextResultAsync(true, cancellationToken); + } + + public async Task NextResultAsync(bool asyncIO, CancellationToken cancellationToken) { VerifyNotDisposed(); @@ -31,17 +36,22 @@ public override async Task NextResultAsync(CancellationToken cancellationT if (oldState != State.HasMoreData) throw new InvalidOperationException("Invalid state: {0}".FormatInvariant(oldState)); - await ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false); + await ReadResultSetHeaderAsync(asyncIO, cancellationToken).ConfigureAwait(false); return true; } public override bool Read() { VerifyNotDisposed(); - return Utility.SynchronousResult(async () => await ReadAsync(CancellationToken.None).ConfigureAwait(false)); + return Utility.SynchronousResult(async () => await ReadAsync(false, CancellationToken.None).ConfigureAwait(false)); } public override Task ReadAsync(CancellationToken cancellationToken) + { + return ReadAsync(true, cancellationToken); + } + + public Task ReadAsync(bool asyncIO, CancellationToken cancellationToken) { VerifyNotDisposed(); @@ -51,7 +61,7 @@ public override Task ReadAsync(CancellationToken cancellationToken) if (m_state != State.AlreadyReadFirstRow) { - var payloadTask = m_session.ReceiveReplyAsync(cancellationToken); + var payloadTask = m_session.ReceiveReplyAsync(asyncIO, cancellationToken); if (payloadTask.IsCompletedSuccessfully) return ReadAsyncRemainder(payloadTask.Result) ? s_trueTask : s_falseTask; return ReadAsyncAwaited(payloadTask.AsTask()); @@ -645,10 +655,10 @@ private void DoClose() } } - internal static async Task CreateAsync(MySqlCommand command, CommandBehavior behavior, CancellationToken cancellationToken) + internal static async Task CreateAsync(MySqlCommand command, CommandBehavior behavior, bool asyncIO, CancellationToken cancellationToken) { var dataReader = new MySqlDataReader(command, behavior); - await dataReader.ReadResultSetHeaderAsync(cancellationToken).ConfigureAwait(false); + await dataReader.ReadResultSetHeaderAsync(asyncIO, cancellationToken).ConfigureAwait(false); return dataReader; } @@ -709,11 +719,11 @@ private MySqlDataReader(MySqlCommand command, CommandBehavior behavior) private MySqlConnection Connection => m_command.Connection; - private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken) + private async Task ReadResultSetHeaderAsync(bool asyncIO, CancellationToken cancellationToken) { while (true) { - var payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + var payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); var firstByte = payload.HeaderByte; if (firstByte == OkPayload.Signature) @@ -740,11 +750,11 @@ private async Task ReadResultSetHeaderAsync(CancellationToken cancellationToken) for (var column = 0; column < m_columnDefinitions.Length; column++) { - payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); m_columnDefinitions[column] = ColumnDefinitionPayload.Create(payload); } - payload = await m_session.ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + payload = await m_session.ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); EofPayload.Create(payload); m_command.LastInsertedId = -1; diff --git a/src/MySqlConnector/PriorityScheduler.cs b/src/MySqlConnector/PriorityScheduler.cs new file mode 100644 index 000000000..d2d59d74f --- /dev/null +++ b/src/MySqlConnector/PriorityScheduler.cs @@ -0,0 +1,23 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MySql.Data +{ + public class PriorityScheduler : TaskScheduler + { + protected override void QueueTask(Task task) + { + throw new System.NotImplementedException(); + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + throw new System.NotImplementedException(); + } + + protected override IEnumerable GetScheduledTasks() + { + throw new System.NotImplementedException(); + } + } +} diff --git a/src/MySqlConnector/Serialization/MySqlSession.cs b/src/MySqlConnector/Serialization/MySqlSession.cs index 6f3adb74e..ebb359734 100644 --- a/src/MySqlConnector/Serialization/MySqlSession.cs +++ b/src/MySqlConnector/Serialization/MySqlSession.cs @@ -23,14 +23,14 @@ public MySqlSession(ConnectionPool pool) public void ReturnToPool() => Pool?.Return(this); - public async Task DisposeAsync(CancellationToken cancellationToken) + public async Task DisposeAsync(bool asyncIO, CancellationToken cancellationToken) { if (m_transmitter != null) { try { - await m_transmitter.SendAsync(QuitPayload.Create(), cancellationToken).ConfigureAwait(false); - await m_transmitter.TryReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + await m_transmitter.SendAsync(QuitPayload.Create(), asyncIO, cancellationToken).ConfigureAwait(false); + await m_transmitter.TryReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); } catch (SocketException) { @@ -54,13 +54,13 @@ public async Task DisposeAsync(CancellationToken cancellationToken) m_state = State.Closed; } - public async Task ConnectAsync(IEnumerable hosts, int port, string userId, string password, string database, int timeoutSeconds, CancellationToken cancellationToken) + public async Task ConnectAsync(IEnumerable hosts, int port, string userId, string password, string database, int timeoutSeconds, bool asyncIO, CancellationToken cancellationToken) { - var connected = await OpenSocketAsync(hosts, port, timeoutSeconds).ConfigureAwait(false); + var connected = await OpenSocketAsync(hosts, port, timeoutSeconds, asyncIO).ConfigureAwait(false); if (!connected) throw new MySqlException("Unable to connect to any of the specified MySQL hosts."); - var payload = await ReceiveAsync(cancellationToken).ConfigureAwait(false); + var payload = await ReceiveAsync(asyncIO, cancellationToken).ConfigureAwait(false); var reader = new ByteArrayReader(payload.ArraySegment.Array, payload.ArraySegment.Offset, payload.ArraySegment.Count); var initialHandshake = new InitialHandshakePacket(reader); if (initialHandshake.AuthPluginName != "mysql_native_password") @@ -70,16 +70,16 @@ public async Task ConnectAsync(IEnumerable hosts, int port, string userI var response = HandshakeResponse41Packet.Create(initialHandshake, userId, password, database); payload = new PayloadData(new ArraySegment(response)); - await SendReplyAsync(payload, cancellationToken).ConfigureAwait(false); - await ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + await SendReplyAsync(payload, asyncIO, cancellationToken).ConfigureAwait(false); + await ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); } - public async Task ResetConnectionAsync(string userId, string password, string database, CancellationToken cancellationToken) + public async Task ResetConnectionAsync(string userId, string password, string database, bool asyncIO, CancellationToken cancellationToken) { if (ServerVersion.Version.CompareTo(ServerVersions.SupportsResetConnection) >= 0) { - await SendAsync(ResetConnectionPayload.Create(), cancellationToken).ConfigureAwait(false); - var payload = await ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + await SendAsync(ResetConnectionPayload.Create(), asyncIO, cancellationToken).ConfigureAwait(false); + var payload = await ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); OkPayload.Create(payload); } else @@ -87,8 +87,8 @@ public async Task ResetConnectionAsync(string userId, string password, string da // optimistically hash the password with the challenge from the initial handshake (supported by MariaDB; doesn't appear to be supported by MySQL) var hashedPassword = AuthenticationUtility.CreateAuthenticationResponse(AuthPluginData, 0, password); var payload = ChangeUserPayload.Create(userId, hashedPassword, database); - await SendAsync(payload, cancellationToken).ConfigureAwait(false); - payload = await ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + await SendAsync(payload, asyncIO, cancellationToken).ConfigureAwait(false); + payload = await ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); if (payload.HeaderByte == AuthenticationMethodSwitchRequestPayload.Signature) { // if the server didn't support the hashed password; rehash with the new challenge @@ -97,19 +97,19 @@ public async Task ResetConnectionAsync(string userId, string password, string da throw new NotSupportedException("Only 'mysql_native_password' authentication method is supported."); hashedPassword = AuthenticationUtility.CreateAuthenticationResponse(switchRequest.Data, 0, password); payload = new PayloadData(new ArraySegment(hashedPassword)); - await SendReplyAsync(payload, cancellationToken).ConfigureAwait(false); - payload = await ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + await SendReplyAsync(payload, asyncIO, cancellationToken).ConfigureAwait(false); + payload = await ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); } OkPayload.Create(payload); } } - public async Task TryPingAsync(CancellationToken cancellationToken) + public async Task TryPingAsync(bool asyncIO, CancellationToken cancellationToken) { - await SendAsync(PingPayload.Create(), cancellationToken).ConfigureAwait(false); + await SendAsync(PingPayload.Create(), asyncIO, cancellationToken).ConfigureAwait(false); try { - var payload = await ReceiveReplyAsync(cancellationToken).ConfigureAwait(false); + var payload = await ReceiveReplyAsync(asyncIO, cancellationToken).ConfigureAwait(false); OkPayload.Create(payload); return true; } @@ -124,20 +124,20 @@ public async Task TryPingAsync(CancellationToken cancellationToken) } // Starts a new conversation with the server by sending the first packet. - public Task SendAsync(PayloadData payload, CancellationToken cancellationToken) - => TryAsync(m_transmitter.SendAsync, payload, cancellationToken); + public Task SendAsync(PayloadData payload, bool asyncIO, CancellationToken cancellationToken) + => TryAsync(m_transmitter.SendAsync, payload, asyncIO, cancellationToken); // Starts a new conversation with the server by receiving the first packet. - public ValueTask ReceiveAsync(CancellationToken cancellationToken) - => TryAsync(m_transmitter.ReceiveAsync, cancellationToken); + public ValueTask ReceiveAsync(bool asyncIO, CancellationToken cancellationToken) + => TryAsync(m_transmitter.ReceiveAsync, asyncIO, cancellationToken); // Continues a conversation with the server by receiving a response to a packet sent with 'Send' or 'SendReply'. - public ValueTask ReceiveReplyAsync(CancellationToken cancellationToken) - => TryAsync(m_transmitter.ReceiveReplyAsync, cancellationToken); + public ValueTask ReceiveReplyAsync(bool asyncIO, CancellationToken cancellationToken) + => TryAsync(m_transmitter.ReceiveReplyAsync, asyncIO, cancellationToken); // Continues a conversation with the server by sending a reply to a packet received with 'Receive' or 'ReceiveReply'. - public Task SendReplyAsync(PayloadData payload, CancellationToken cancellationToken) - => TryAsync(m_transmitter.SendReplyAsync, payload, cancellationToken); + public Task SendReplyAsync(PayloadData payload, bool asyncIO, CancellationToken cancellationToken) + => TryAsync(m_transmitter.SendReplyAsync, payload, asyncIO, cancellationToken); private void VerifyConnected() { @@ -147,9 +147,10 @@ private void VerifyConnected() throw new InvalidOperationException("MySqlSession is not connected."); } - private async Task OpenSocketAsync(IEnumerable hostnames, int port, int timeoutSeconds) + private async Task OpenSocketAsync(IEnumerable hostnames, int port, int timeoutSeconds, bool asyncIO) { DateTime? connectTimeout = null; + int? timeoutMs = null; if (timeoutSeconds > 0) { connectTimeout = DateTime.UtcNow + TimeSpan.FromSeconds(timeoutSeconds); @@ -173,19 +174,36 @@ private async Task OpenSocketAsync(IEnumerable hostnames, int port var socket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); try { - var connectTasks = new List + if (connectTimeout != null) + { + timeoutMs = (int) (connectTimeout.Value - DateTime.UtcNow).TotalMilliseconds; + } + if (timeoutMs == null || timeoutMs > 0) { + if (asyncIO) + { + Console.WriteLine("OpenSocketAsync Async"); + var connectTasks = new List + { #if NETSTANDARD1_3 - socket.ConnectAsync(ipAddress, port) + socket.ConnectAsync(ipAddress, port) #else - Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, hostname, port, null) + Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, hostname, port, null) #endif - }; - if (connectTimeout != null) - { - connectTasks.Add(Task.Delay((int)(connectTimeout.Value - DateTime.UtcNow).TotalMilliseconds)); + }; + if (connectTimeout != null) + { + connectTasks.Add(Task.Delay(timeoutMs.Value)); + } + await Task.WhenAny(connectTasks).ConfigureAwait(false); + } + else + { + Console.WriteLine("OpenSocketAsync Sync"); + // todo: timeout + socket.Connect(ipAddress, port); + } } - await Task.WhenAny(connectTasks).ConfigureAwait(false); if (socket.Connected) { m_socket = socket; @@ -212,10 +230,10 @@ private async Task OpenSocketAsync(IEnumerable hostnames, int port } - private Task TryAsync(Func func, TArg arg, CancellationToken cancellationToken) + private Task TryAsync(Func func, TArg arg, bool asyncIO, CancellationToken cancellationToken) { VerifyConnected(); - var task = func(arg, cancellationToken); + var task = func(arg, asyncIO, cancellationToken); if (task.Status == TaskStatus.RanToCompletion) return task; @@ -232,10 +250,10 @@ private async Task TryAsyncContinuation(Task task) } } - private ValueTask TryAsync(Func> func, CancellationToken cancellationToken) + private ValueTask TryAsync(Func> func, bool asyncIO, CancellationToken cancellationToken) { VerifyConnected(); - var task = func(cancellationToken); + var task = func(asyncIO, cancellationToken); if (task.IsCompletedSuccessfully) { if (task.Result.HeaderByte != ErrorPayload.Signature) diff --git a/src/MySqlConnector/Serialization/PacketTransmitter.cs b/src/MySqlConnector/Serialization/PacketTransmitter.cs index 27969f990..7af15c36b 100644 --- a/src/MySqlConnector/Serialization/PacketTransmitter.cs +++ b/src/MySqlConnector/Serialization/PacketTransmitter.cs @@ -18,32 +18,32 @@ public PacketTransmitter(Socket socket) } // Starts a new conversation with the server by sending the first packet. - public Task SendAsync(PayloadData payload, CancellationToken cancellationToken) + public Task SendAsync(PayloadData payload, bool asyncIO, CancellationToken cancellationToken) { m_sequenceId = 0; - return DoSendAsync(payload, cancellationToken); + return DoSendAsync(payload, asyncIO, cancellationToken); } // Starts a new conversation with the server by receiving the first packet. - public ValueTask ReceiveAsync(CancellationToken cancellationToken) + public ValueTask ReceiveAsync(bool asyncIO, CancellationToken cancellationToken) { m_sequenceId = 0; - return DoReceiveAsync(cancellationToken); + return DoReceiveAsync(asyncIO, false, cancellationToken); } // Continues a conversation with the server by receiving a response to a packet sent with 'Send' or 'SendReply'. - public ValueTask ReceiveReplyAsync(CancellationToken cancellationToken) - => DoReceiveAsync(cancellationToken); + public ValueTask ReceiveReplyAsync(bool asyncIO, CancellationToken cancellationToken) + => DoReceiveAsync(asyncIO, false, cancellationToken); // Continues a conversation with the server by receiving a response to a packet sent with 'Send' or 'SendReply'. - public ValueTask TryReceiveReplyAsync(CancellationToken cancellationToken) - => DoReceiveAsync(cancellationToken, optional: true); + public ValueTask TryReceiveReplyAsync(bool asyncIO, CancellationToken cancellationToken) + => DoReceiveAsync(asyncIO, true, cancellationToken); // Continues a conversation with the server by sending a reply to a packet received with 'Receive' or 'ReceiveReply'. - public Task SendReplyAsync(PayloadData payload, CancellationToken cancellationToken) - => DoSendAsync(payload, cancellationToken); + public Task SendReplyAsync(PayloadData payload, bool asyncIO, CancellationToken cancellationToken) + => DoSendAsync(payload, asyncIO, cancellationToken); - private async Task DoSendAsync(PayloadData payload, CancellationToken cancellationToken) + private async Task DoSendAsync(PayloadData payload, bool asyncIO, CancellationToken cancellationToken) { var bytesSent = 0; var data = payload.ArraySegment; @@ -61,23 +61,40 @@ private async Task DoSendAsync(PayloadData payload, CancellationToken cancellati if (bytesToSend <= m_buffer.Length - 4) { Buffer.BlockCopy(data.Array, data.Offset + bytesSent, m_buffer, 4, bytesToSend); - m_socketAwaitable.EventArgs.SetBuffer(0, bytesToSend + 4); - await m_socket.SendAsync(m_socketAwaitable); + if (asyncIO) + { + m_socketAwaitable.EventArgs.SetBuffer(0, bytesToSend + 4); + await m_socket.SendAsync(m_socketAwaitable); + } + else + { + m_socket.Send(m_buffer, 0, bytesToSend + 4, SocketFlags.None); + } } else { - m_socketAwaitable.EventArgs.SetBuffer(null, 0, 0); - m_socketAwaitable.EventArgs.BufferList = new[] { new ArraySegment(m_buffer, 0, 4), new ArraySegment(data.Array, data.Offset + bytesSent, bytesToSend) }; - await m_socket.SendAsync(m_socketAwaitable); - m_socketAwaitable.EventArgs.BufferList = null; - m_socketAwaitable.EventArgs.SetBuffer(m_buffer, 0, 0); + if (asyncIO) + { + Console.WriteLine("DoSendAsync Async"); + m_socketAwaitable.EventArgs.SetBuffer(null, 0, 0); + m_socketAwaitable.EventArgs.BufferList = new[] { new ArraySegment(m_buffer, 0, 4), new ArraySegment(data.Array, data.Offset + bytesSent, bytesToSend) }; + await m_socket.SendAsync(m_socketAwaitable); + m_socketAwaitable.EventArgs.BufferList = null; + m_socketAwaitable.EventArgs.SetBuffer(m_buffer, 0, 0); + } + else + { + Console.WriteLine("DoSendAsync Sync"); + m_socket.Send(m_buffer, 0, 4, SocketFlags.None); + m_socket.Send(data.Array, data.Offset + bytesSent, bytesToSend, SocketFlags.None); + } } bytesSent += bytesToSend; } while (bytesToSend == c_maxPacketSize); } - private ValueTask DoReceiveAsync(CancellationToken cancellationToken, bool optional = false) + private ValueTask DoReceiveAsync(bool asyncIO, bool optional, CancellationToken cancellationToken) { if (m_end - m_offset > 4) { @@ -100,13 +117,13 @@ private ValueTask DoReceiveAsync(CancellationToken cancellationToke } } - return new ValueTask(DoReceiveAsync2(cancellationToken, optional)); + return new ValueTask(DoReceiveAsync2(asyncIO, optional, cancellationToken)); } - private async Task DoReceiveAsync2(CancellationToken cancellationToken, bool optional = false) + private async Task DoReceiveAsync2(bool asyncIO, bool optional, CancellationToken cancellationToken) { // common case: the payload is contained within one packet - var payload = await ReceivePacketAsync(cancellationToken, optional).ConfigureAwait(false); + var payload = await ReceivePacketAsync(asyncIO, optional, cancellationToken).ConfigureAwait(false); if (payload == null || payload.ArraySegment.Count != c_maxPacketSize) return payload; @@ -117,7 +134,7 @@ private async Task DoReceiveAsync2(CancellationToken cancellationTo do { - payload = await ReceivePacketAsync(cancellationToken, optional).ConfigureAwait(false); + payload = await ReceivePacketAsync(asyncIO, optional, cancellationToken).ConfigureAwait(false); var oldLength = payloadBytes.Length; Array.Resize(ref payloadBytes, payloadBytes.Length + payload.ArraySegment.Count); @@ -127,7 +144,7 @@ private async Task DoReceiveAsync2(CancellationToken cancellationTo return new PayloadData(new ArraySegment(payloadBytes)); } - private async Task ReceivePacketAsync(CancellationToken cancellationToken, bool optional) + private async Task ReceivePacketAsync(bool asyncIO, bool optional, CancellationToken cancellationToken) { if (m_end - m_offset < 4) { @@ -142,9 +159,19 @@ private async Task ReceivePacketAsync(CancellationToken cancellatio int count = m_buffer.Length - m_end; while (m_end - m_offset < 4) { - m_socketAwaitable.EventArgs.SetBuffer(offset, count); - await m_socket.ReceiveAsync(m_socketAwaitable); - int bytesRead = m_socketAwaitable.EventArgs.BytesTransferred; + int bytesRead; + if (asyncIO) + { + Console.WriteLine("ReceivePacketAsync Async"); + m_socketAwaitable.EventArgs.SetBuffer(offset, count); + await m_socket.ReceiveAsync(m_socketAwaitable); + bytesRead = m_socketAwaitable.EventArgs.BytesTransferred; + } + else + { + Console.WriteLine("ReceivePacketAsync Sync"); + bytesRead = m_socket.Receive(m_buffer, offset, count, SocketFlags.None); + } if (bytesRead <= 0) { if (optional) @@ -179,7 +206,10 @@ private async Task ReceivePacketAsync(CancellationToken cancellatio if (payloadLength > m_buffer.Length) { readData = new byte[payloadLength]; - m_socketAwaitable.EventArgs.SetBuffer(readData, 0, 0); + if (asyncIO) + { + m_socketAwaitable.EventArgs.SetBuffer(readData, 0, 0); + } } Buffer.BlockCopy(m_buffer, m_offset, readData, 0, m_end - m_offset); m_end -= m_offset; @@ -190,9 +220,17 @@ private async Task ReceivePacketAsync(CancellationToken cancellatio count = readData.Length - m_end; while (m_end < payloadLength) { - m_socketAwaitable.EventArgs.SetBuffer(offset, count); - await m_socket.ReceiveAsync(m_socketAwaitable); - int bytesRead = m_socketAwaitable.EventArgs.BytesTransferred; + int bytesRead; + if (asyncIO) + { + m_socketAwaitable.EventArgs.SetBuffer(offset, count); + await m_socket.ReceiveAsync(m_socketAwaitable); + bytesRead = m_socketAwaitable.EventArgs.BytesTransferred; + } + else + { + bytesRead = m_socket.Receive(readData, offset, count, SocketFlags.None); + } if (bytesRead <= 0) throw new EndOfStreamException(); offset += bytesRead; @@ -203,7 +241,10 @@ private async Task ReceivePacketAsync(CancellationToken cancellatio // switch back to original buffer if a larger one was allocated if (payloadLength > m_buffer.Length) { - m_socketAwaitable.EventArgs.SetBuffer(m_buffer, 0, 0); + if (asyncIO) + { + m_socketAwaitable.EventArgs.SetBuffer(m_buffer, 0, 0); + } m_end = 0; } diff --git a/src/MySqlConnector/Utility.cs b/src/MySqlConnector/Utility.cs index a48cbe37d..456973f30 100644 --- a/src/MySqlConnector/Utility.cs +++ b/src/MySqlConnector/Utility.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; using System.Text; @@ -27,38 +28,72 @@ public static string FormatInvariant(this string format, params object[] args) public static string GetString(this Encoding encoding, ArraySegment arraySegment) => encoding.GetString(arraySegment.Array, arraySegment.Offset, arraySegment.Count); - public static void SynchronousResult(Func t) + public static void SynchronousResult(Func t, bool lowPriority = false) { var invoked = Thread.CurrentThread.ManagedThreadId; Func log = async () => { Console.WriteLine("Thread ID: " + Thread.CurrentThread.ManagedThreadId + " Invoked from Thread ID: " + invoked); - //Console.WriteLine(Environment.StackTrace); await t().ConfigureAwait(false); }; - + if (lowPriority) + { + Task highPriority; + while (s_high_priority.TryDequeue(out highPriority)) + { + highPriority.GetAwaiter().GetResult(); + } + } Console.WriteLine("Thread CT: " + Process.GetCurrentProcess().Threads.Count); Console.WriteLine("Thread ID: " + invoked + " Kicking off task"); var run = log(); + if (!lowPriority) + { + s_high_priority.Enqueue(run); + } run.GetAwaiter().GetResult(); - +// var run = lowPriority ? s_low_priority_task_factory.StartNew(log) : s_normal_priority_task_factory.StartNew(log); +// run.Unwrap().GetAwaiter().GetResult(); } - public static T SynchronousResult(Func> t) + public static T SynchronousResult(Func> t, bool lowPriority=false) { var invoked = Thread.CurrentThread.ManagedThreadId; Func> log = async () => { Console.WriteLine("Thread ID: " + Thread.CurrentThread.ManagedThreadId + " Invoked from Thread ID: " + invoked); - //Console.WriteLine(Environment.StackTrace); return await t().ConfigureAwait(false); }; - + if (lowPriority) + { + Task highPriority; + while (s_high_priority.TryDequeue(out highPriority)) + { + highPriority.GetAwaiter().GetResult(); + } + } Console.WriteLine("Thread CT: " + Process.GetCurrentProcess().Threads.Count); Console.WriteLine("Thread ID: " + invoked + " Kicking off task"); var run = log(); + if (!lowPriority) + { + s_high_priority.Enqueue(run); + } return run.GetAwaiter().GetResult(); +// var run = lowPriority ? s_low_priority_task_factory.StartNew(log) : s_normal_priority_task_factory.StartNew(log); +// return run.Unwrap().GetAwaiter().GetResult(); } + private static ConcurrentQueue s_high_priority = new ConcurrentQueue(); + + private static readonly ConcurrentExclusiveSchedulerPair s_low_prioirty_scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 25); + private static readonly TaskFactory s_low_priority_task_factory = new TaskFactory( + CancellationToken.None, TaskCreationOptions.LongRunning, + TaskContinuationOptions.None, TaskScheduler.Default); + + private static readonly TaskFactory s_normal_priority_task_factory = new TaskFactory( + CancellationToken.None, TaskCreationOptions.None, + TaskContinuationOptions.None, TaskScheduler.Default); + } } diff --git a/tests/MySqlConnector.Performance/scripts/vegeta/targets-sync2.txt b/tests/MySqlConnector.Performance/scripts/vegeta/targets-sync2.txt new file mode 100644 index 000000000..1b2345caf --- /dev/null +++ b/tests/MySqlConnector.Performance/scripts/vegeta/targets-sync2.txt @@ -0,0 +1 @@ +GET http://localhost:5000/api/sync