diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 85a4b5062..741914306 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -2,7 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -24,11 +24,8 @@ public class KestrelThread private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); - private object _workSync = new Object(); + private ConcurrentQueue _workCurrent = new ConcurrentQueue(); + private ConcurrentQueue _closeHandlesCurrent = new ConcurrentQueue(); private bool _stopImmediate = false; private bool _initCompleted = false; private ExceptionDispatchInfo _closeError; @@ -113,40 +110,36 @@ private void OnStopImmediate(object obj) public void Post(Action callback, object state) { - lock (_workSync) - { - _workAdding.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state }); - } + _workCurrent.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state }); + _post.Send(); } public void Post(Action callback, T state) { - lock (_workSync) + + _workCurrent.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = (callback2, state2) => ((Action)callback2).Invoke((T)state2), - Callback = callback, - State = state - }); - } + CallbackAdapter = (callback2, state2) => ((Action)callback2).Invoke((T)state2), + Callback = callback, + State = state + }); + _post.Send(); } public Task PostAsync(Action callback, object state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + + _workCurrent.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _objectCallbackAdapter, - Callback = callback, - State = state, - Completion = tcs - }); - } + CallbackAdapter = _objectCallbackAdapter, + Callback = callback, + State = state, + Completion = tcs + }); + _post.Send(); return tcs.Task; } @@ -154,16 +147,15 @@ public Task PostAsync(Action callback, object state) public Task PostAsync(Action callback, T state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + + _workCurrent.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = (state1, state2) => ((Action)state1).Invoke((T)state2), - Callback = callback, - State = state, - Completion = tcs - }); - } + CallbackAdapter = (state1, state2) => ((Action)state1).Invoke((T)state2), + Callback = callback, + State = state, + Completion = tcs + }); + _post.Send(); return tcs.Task; } @@ -182,10 +174,8 @@ public void Send(Action callback, object state) private void PostCloseHandle(Action callback, IntPtr handle) { - lock (_workSync) - { - _closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); - } + _closeHandlesCurrent.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); + _post.Send(); } @@ -253,16 +243,9 @@ private void OnPost() private void DoPostWork() { - Queue queue; - lock (_workSync) - { - queue = _workAdding; - _workAdding = _workRunning; - _workRunning = queue; - } - while (queue.Count != 0) + Work work; + while (_workCurrent.TryDequeue(out work)) { - var work = queue.Dequeue(); try { work.CallbackAdapter(work.Callback, work.State); @@ -280,7 +263,7 @@ private void DoPostWork() { if (work.Completion != null) { - ThreadPool.QueueUserWorkItem(_ => work.Completion.SetException(ex), null); + ThreadPool.QueueUserWorkItem(tcs => ((TaskCompletionSource)tcs).SetException(ex), work.Completion); } else { @@ -292,16 +275,9 @@ private void DoPostWork() } private void DoPostCloseHandle() { - Queue queue; - lock (_workSync) - { - queue = _closeHandleAdding; - _closeHandleAdding = _closeHandleRunning; - _closeHandleRunning = queue; - } - while (queue.Count != 0) + CloseHandle closeHandle; + while (_closeHandlesCurrent.TryDequeue(out closeHandle)) { - var closeHandle = queue.Dequeue(); try { closeHandle.Callback(closeHandle.Handle);