Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async backlog #1612

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using static StackExchange.Redis.ConnectionMultiplexer;
Expand Down Expand Up @@ -56,9 +53,7 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
TimeoutMilliseconds = timeoutMilliseconds;
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
_weakRefThis = new WeakReference(this);
}
private readonly WeakReference _weakRefThis;

private readonly int TimeoutMilliseconds;

Expand Down Expand Up @@ -771,22 +766,15 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void StartBacklogProcessor()
{
var sched = Multiplexer.SocketManager?.Scheduler ?? PipeScheduler.ThreadPool;
#if DEBUG
_backlogProcessorRequestedTime = Environment.TickCount;
#endif
sched.Schedule(s_ProcessBacklog, _weakRefThis);
Task.Run(ProcessBacklogAsync);
}
#if DEBUG
private volatile int _backlogProcessorRequestedTime;
#endif

private static readonly Action<object> s_ProcessBacklog = s =>
{
var wr = (WeakReference)s;
if (wr.Target is PhysicalBridge bridge) bridge.ProcessBacklog();
};

private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
{
lock (_backlog)
Expand All @@ -810,6 +798,7 @@ private void CheckBacklogForTimeouts() // check the head of the backlog queue, c
internal enum BacklogStatus : byte
{
Inactive,
Starting,
Started,
CheckingForWork,
CheckingForTimeout,
Expand All @@ -823,7 +812,7 @@ internal enum BacklogStatus : byte
Faulted,
}
private volatile BacklogStatus _backlogStatus;
private void ProcessBacklog()
private async Task ProcessBacklogAsync()
{
LockToken token = default;
try
Expand All @@ -833,12 +822,19 @@ private void ProcessBacklog()
var msToStartWorker = unchecked(tryToAcquireTime - _backlogProcessorRequestedTime);
int failureCount = 0;
#endif
while(true)
_backlogStatus = BacklogStatus.Starting;
while (true)
{
// try and get the lock; if unsuccessful, check for termination
token = _singleWriterMutex.TryWait();
if (token) break; // got the lock
lock (_backlog) { if (_backlog.Count == 0) return; }
// check whether the backlog is empty *before* even trying to get the lock
lock (_backlog)
{
if (_backlog.Count == 0) return; // nothing to do
}

// try and get the lock; if unsuccessful, retry
token = await _singleWriterMutex.TryWaitAsync().ConfigureAwait(false);
if (token.Success) break; // got the lock; now go do something with it

#if DEBUG
failureCount++;
#endif
Expand Down Expand Up @@ -887,9 +883,7 @@ private void ProcessBacklog()
if (result == WriteResult.Success)
{
_backlogStatus = BacklogStatus.Flushing;
#pragma warning disable CS0618
result = physical.FlushSync(false, timeout);
#pragma warning restore CS0618
result = await physical.FlushAsync(false).ConfigureAwait(false);
}

_backlogStatus = BacklogStatus.MarkingInactive;
Expand Down