Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Remove KestrelThread locks #316

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 35 additions & 59 deletions src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,11 +24,8 @@ public class KestrelThread
private Thread _thread;
private UvLoopHandle _loop;
private UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>();
private Queue<Work> _workRunning = new Queue<Work>();
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
private object _workSync = new Object();
private ConcurrentQueue<Work> _workCurrent = new ConcurrentQueue<Work>();
private ConcurrentQueue<CloseHandle> _closeHandlesCurrent = new ConcurrentQueue<CloseHandle>();
private bool _stopImmediate = false;
private bool _initCompleted = false;
private ExceptionDispatchInfo _closeError;
Expand Down Expand Up @@ -113,57 +110,52 @@ private void OnStopImmediate(object obj)

public void Post(Action<object> callback, object state)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state });
}
_workCurrent.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state });

_post.Send();
}

public void Post<T>(Action<T> callback, T state)
{
lock (_workSync)

_workCurrent.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = (callback2, state2) => ((Action<T>)callback2).Invoke((T)state2),
Callback = callback,
State = state
});
}
CallbackAdapter = (callback2, state2) => ((Action<T>)callback2).Invoke((T)state2),
Callback = callback,
State = state
});

_post.Send();
}

public Task PostAsync(Action<object> callback, object state)
{
var tcs = new TaskCompletionSource<int>();
lock (_workSync)

_workCurrent.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _objectCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
}
CallbackAdapter = _objectCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});

_post.Send();
return tcs.Task;
}

public Task PostAsync<T>(Action<T> callback, T state)
{
var tcs = new TaskCompletionSource<int>();
lock (_workSync)

_workCurrent.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = (state1, state2) => ((Action<T>)state1).Invoke((T)state2),
Callback = callback,
State = state,
Completion = tcs
});
}
CallbackAdapter = (state1, state2) => ((Action<T>)state1).Invoke((T)state2),
Callback = callback,
State = state,
Completion = tcs
});

_post.Send();
return tcs.Task;
}
Expand All @@ -182,10 +174,8 @@ public void Send(Action<object> callback, object state)

private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
lock (_workSync)
{
_closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle });
}
_closeHandlesCurrent.Enqueue(new CloseHandle { Callback = callback, Handle = handle });

_post.Send();
}

Expand Down Expand Up @@ -253,16 +243,9 @@ private void OnPost()

private void DoPostWork()
{
Queue<Work> queue;
lock (_workSync)
{
queue = _workAdding;
_workAdding = _workRunning;
_workRunning = queue;
}
while (queue.Count != 0)
Work work;
while (_workCurrent.TryDequeue(out work))
{
var work = queue.Dequeue();
try
{
work.CallbackAdapter(work.Callback, work.State);
Expand All @@ -280,7 +263,7 @@ private void DoPostWork()
{
if (work.Completion != null)
{
ThreadPool.QueueUserWorkItem(_ => work.Completion.SetException(ex), null);
ThreadPool.QueueUserWorkItem(tcs => ((TaskCompletionSource<int>)tcs).SetException(ex), work.Completion);
}
else
{
Expand All @@ -292,16 +275,9 @@ private void DoPostWork()
}
private void DoPostCloseHandle()
{
Queue<CloseHandle> queue;
lock (_workSync)
{
queue = _closeHandleAdding;
_closeHandleAdding = _closeHandleRunning;
_closeHandleRunning = queue;
}
while (queue.Count != 0)
CloseHandle closeHandle;
while (_closeHandlesCurrent.TryDequeue(out closeHandle))
{
var closeHandle = queue.Dequeue();
try
{
closeHandle.Callback(closeHandle.Handle);
Expand Down