Skip to content

Commit

Permalink
Backlog locking fixes - more #2008 follow-up (#2015)
Browse files Browse the repository at this point in the history
In troubleshooting these 2 tests, I realized what's happening: a really dumb placement mistake in #2008. Now, instead of locking inside the damn lock, it loops outside a bit cleaner and higher up. Performance wins are the same but it's a lot sander and doesn't block both the backlog and the writer for another 5 seconds. Now only the thread lingers and it'll try to get the lock when running another pass, if it gets any in the next 5 seconds.
  • Loading branch information
NickCraver authored Feb 26, 2022
1 parent 511ba10 commit 32dea99
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 26 deletions.
5 changes: 1 addition & 4 deletions src/StackExchange.Redis/ExceptionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,7 @@ ServerEndPoint server
Add(data, sb, "Queue-Awaiting-Write", "qu", bs.BacklogMessagesPending.ToString());
Add(data, sb, "Queue-Awaiting-Response", "qs", bs.Connection.MessagesSentAwaitingResponse.ToString());
Add(data, sb, "Active-Writer", "aw", bs.IsWriterActive.ToString());
if (bs.BacklogMessagesPending != 0)
{
Add(data, sb, "Backlog-Writer", "bw", bs.BacklogStatus.ToString());
}
Add(data, sb, "Backlog-Writer", "bw", bs.BacklogStatus.ToString());
if (bs.Connection.ReadStatus != PhysicalConnection.ReadStatus.NA) Add(data, sb, "Read-State", "rs", bs.Connection.ReadStatus.ToString());
if (bs.Connection.WriteStatus != PhysicalConnection.WriteStatus.NA) Add(data, sb, "Write-State", "ws", bs.Connection.WriteStatus.ToString());

Expand Down
36 changes: 20 additions & 16 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ internal enum BacklogStatus : byte
Starting,
Started,
CheckingForWork,
SpinningDown,
CheckingForTimeout,
CheckingForTimeoutComplete,
RecordingTimeout,
Expand All @@ -909,12 +910,25 @@ private async Task ProcessBacklogAsync()
_backlogStatus = BacklogStatus.Starting;
try
{
if (!_backlog.IsEmpty)
while (true)
{
// TODO: vNext handoff this backlog to another primary ("can handle everything") connection
// and remove any per-server commands. This means we need to track a bit of whether something
// was server-endpoint-specific in PrepareToPushMessageToBridge (was the server ref null or not)
await ProcessBridgeBacklogAsync().ForAwait(); // Needs handoff
if (!_backlog.IsEmpty)
{
// TODO: vNext handoff this backlog to another primary ("can handle everything") connection
// and remove any per-server commands. This means we need to track a bit of whether something
// was server-endpoint-specific in PrepareToPushMessageToBridge (was the server ref null or not)
await ProcessBridgeBacklogAsync().ForAwait();
}

// The cost of starting a new thread is high, and we can bounce in and out of the backlog a lot.
// So instead of just exiting, keep this thread waiting for 5 seconds to see if we got another backlog item.
_backlogStatus = BacklogStatus.SpinningDown;
// Note this is happening *outside* the lock
var gotMore = _backlogAutoReset.WaitOne(5000);
if (!gotMore)
{
break;
}
}
}
catch
Expand Down Expand Up @@ -1000,17 +1014,7 @@ private async Task ProcessBridgeBacklogAsync()
// If there's nothing left in queue, we're done.
if (!BacklogTryDequeue(out message))
{
// The cost of starting a new thread is high, and we can bounce in and out of the backlog a lot.
// So instead of just exiting, keep this thread waiting for 5 seconds to see if we got another backlog item.
var gotMore = _backlogAutoReset.WaitOne(5000);
if (gotMore)
{
continue;
}
else
{
break;
}
break;
}
}

Expand Down
4 changes: 3 additions & 1 deletion tests/StackExchange.Redis.Tests/BasicOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task PingOnce()
}

[Fact]
public void RapidDispose()
public async Task RapidDispose()
{
RedisKey key = Me();
using (var primary = Create())
Expand All @@ -41,6 +41,8 @@ public void RapidDispose()
secondary.GetDatabase().StringIncrement(key, flags: CommandFlags.FireAndForget);
}
}
// Give it a moment to get through the pipe...they were fire and forget
await UntilConditionAsync(TimeSpan.FromSeconds(5), () => 10 == (int)conn.StringGet(key));
Assert.Equal(10, (int)conn.StringGet(key));
}
}
Expand Down
6 changes: 3 additions & 3 deletions tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void TimeoutException()
Assert.StartsWith("Test Timeout, command=PING", ex.Message);
Assert.Contains("clientName: " + nameof(TimeoutException), ex.Message);
// Ensure our pipe numbers are in place
Assert.Contains("inst: 0, qu: 0, qs: 0, aw: False, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.Contains("inst: 0, qu: 0, qs: 0, aw: False, bw: Inactive, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.Contains("mc: 1/1/0", ex.Message);
Assert.Contains("serverEndpoint: " + server.EndPoint, ex.Message);
Assert.Contains("IOCP: ", ex.Message);
Expand Down Expand Up @@ -194,13 +194,13 @@ public void NoConnectionException(bool abortOnConnect, int connCount, int comple
// Ensure our pipe numbers are in place if they should be
if (hasDetail)
{
Assert.Contains("inst: 0, qu: 0, qs: 0, aw: False, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.Contains("inst: 0, qu: 0, qs: 0, aw: False, bw: Inactive, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.Contains($"mc: {connCount}/{completeCount}/0", ex.Message);
Assert.Contains("serverEndpoint: " + server.EndPoint.ToString().Replace("Unspecified/", ""), ex.Message);
}
else
{
Assert.DoesNotContain("inst: 0, qu: 0, qs: 0, aw: False, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.DoesNotContain("inst: 0, qu: 0, qs: 0, aw: False, bw: Inactive, in: 0, in-pipe: 0, out-pipe: 0", ex.Message);
Assert.DoesNotContain($"mc: {connCount}/{completeCount}/0", ex.Message);
Assert.DoesNotContain("serverEndpoint: " + server.EndPoint.ToString().Replace("Unspecified/", ""), ex.Message);
}
Expand Down
4 changes: 2 additions & 2 deletions tests/StackExchange.Redis.Tests/PubSubMultiserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ await sub.SubscribeAsync(channel, (_, val) =>
else
{
// This subscription shouldn't be able to reconnect by flags (demanding an unavailable server)
await UntilConditionAsync(TimeSpan.FromSeconds(2), () => subscription.IsConnected);
await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
Assert.False(subscription.IsConnected);
Log("Unable to reconnect (as expected)");

// Allow connecting back to the original
muxer.AllowConnect = true;
await UntilConditionAsync(TimeSpan.FromSeconds(2), () => subscription.IsConnected);
await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
Assert.True(subscription.IsConnected);

var newServer = subscription.GetCurrentServer();
Expand Down

0 comments on commit 32dea99

Please sign in to comment.