Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub fixes for subscribe/re-subscribe #1947

Merged
merged 69 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
990f5e8
WIP: Pub/Sub portion of #1912
NickCraver Jan 10, 2022
38132a3
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 10, 2022
d552097
Lots of things - need to writeup in PR
NickCraver Jan 10, 2022
fac5a1b
Fix KeepAlive on PhysicalBridge
NickCraver Jan 10, 2022
1cb00ff
Fix default version tests
NickCraver Jan 10, 2022
7fdb45a
Fix up Isue922 test now that we ping the right things
NickCraver Jan 11, 2022
85c5a4d
Migrate PubSub tests off sync threads
NickCraver Jan 11, 2022
98701c9
Fix shared connections with simulated failures (cross-test noise)
NickCraver Jan 11, 2022
377c813
Compensate for delay removal
NickCraver Jan 11, 2022
d9c68e1
Add logging to pubsub methods
NickCraver Jan 11, 2022
3f6e030
Add logging to PubSubGetAllCorrectOrder
NickCraver Jan 11, 2022
b63648a
Tidy exception messages
NickCraver Jan 11, 2022
148c975
Eliminate writer here
NickCraver Jan 12, 2022
78ebf5e
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 18, 2022
8e168d9
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 18, 2022
21d36f0
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 19, 2022
144c22e
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 20, 2022
92cecfc
WIP: This could all be a bad idea
NickCraver Jan 20, 2022
7d7f020
Gap commit
NickCraver Jan 20, 2022
f247980
Pub/Sub: default to 3.0, fix PING, fix server selection in cluster, a…
NickCraver Jan 20, 2022
6ecde2a
Include PING routing
NickCraver Jan 20, 2022
f91e4c5
Revert testing change
NickCraver Jan 20, 2022
bca9de0
Merge remote-tracking branch 'origin/main' into craver/pub-sub-prep
NickCraver Jan 20, 2022
5a6db1c
Merge branch 'craver/pub-sub-prep' into craver/pub-sub-issues
NickCraver Jan 20, 2022
daa1b9c
Revert that bandaid test
NickCraver Jan 20, 2022
f36b6d9
Merge remote-tracking branch 'origin/craver/pub-sub-prep' into craver…
NickCraver Jan 20, 2022
70e1735
Nope.
NickCraver Jan 20, 2022
a814231
Bits
NickCraver Jan 20, 2022
1d4b4ad
Sync work stop commit (moving to laptop!)
NickCraver Jan 21, 2022
5de45a2
Tests: profiler logging made easier
NickCraver Jan 22, 2022
64565dd
Tests: use profiling and add more logging
NickCraver Jan 22, 2022
00f851c
Pub/Sub: Register immediately, but complete async
NickCraver Jan 22, 2022
029c2a3
Pre-clear rather than await
NickCraver Jan 22, 2022
341f532
Fix more things
NickCraver Jan 22, 2022
48127a1
Merge branch 'craver/pub-sub-wip' into craver/pub-sub-issues
NickCraver Jan 22, 2022
5dbf575
Add logging to TestPublishWithSubscribers
NickCraver Jan 22, 2022
237848f
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 22, 2022
51c927b
Light up more pubsub tests
NickCraver Jan 23, 2022
20d2d28
Several subtle but big changes
NickCraver Jan 23, 2022
814b401
More PubSub logging
NickCraver Jan 23, 2022
667aa34
TestPatternPubSub: give reception a moment
NickCraver Jan 23, 2022
cf96bba
Moar!
NickCraver Jan 23, 2022
a6be64a
Add logging to Issue1101 pipe
NickCraver Jan 23, 2022
31b38dc
PubSubGetAllAnyOrder: don't share conn
NickCraver Jan 23, 2022
79e5090
TextWriterOutputHelper: fix end-of-tests race case
NickCraver Jan 23, 2022
8ea3b6d
Bump Docker images to 6.2.6
NickCraver Jan 23, 2022
d9caedb
Nick, you idiot.
NickCraver Jan 23, 2022
3fa77a4
Fix log message too
NickCraver Jan 23, 2022
04233fe
Sentinel: Fire and Forget on startup
NickCraver Jan 23, 2022
ff54012
ExplicitPublishMode: remove delay
NickCraver Jan 23, 2022
84439d4
Cleanup and comments!
NickCraver Jan 23, 2022
60d9b80
More comments!
NickCraver Jan 23, 2022
c6fb569
Annnnnd the other Sentinel one
NickCraver Jan 23, 2022
c0206fa
Start PubSubMultiserver
NickCraver Jan 24, 2022
b484e5f
Tests, yay!
NickCraver Jan 24, 2022
837a654
Assert up front
NickCraver Jan 24, 2022
b0001ab
PubSub tests: log everything
NickCraver Jan 24, 2022
1bc971d
Primary/Replica tests
NickCraver Jan 25, 2022
907cd20
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 25, 2022
9c31958
Fix PubSub tests: can't share that connection yo
NickCraver Jan 25, 2022
eed3ba0
Remove the .NET 5.0 from Windows build too...
NickCraver Jan 25, 2022
2fe8d13
Sentinel: account for multi-suite failover states
NickCraver Jan 25, 2022
d234235
ExecuteWithUnsubscribeViaSubscriber: don't share conn
NickCraver Jan 25, 2022
33f52af
Re-disable TestMassivePublishWithWithoutFlush_Local
NickCraver Jan 25, 2022
6140700
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 25, 2022
dde51a6
Add initial release notes
NickCraver Jan 25, 2022
0465ee1
Update publish docs
NickCraver Jan 28, 2022
3d9b878
Merge branch 'craver/pub-sub-issues' of https://github.com/StackExcha…
NickCraver Jan 31, 2022
169a173
ReconfigureAsyns: FireAndForget for subscription re-establish
NickCraver Jan 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ jobs:
with:
dotnet-version: |
3.1.x
5.0.x
6.0.x
- name: .NET Build
run: dotnet build Build.csproj -c Release /p:CI=true
Expand Down
5 changes: 5 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
- Moved tiebreaker fetching in connections into the handshake phase (streamline + simplification) (#1931 via NickCraver)
- Fixed potential disposed object usage around Arenas (pulling in [Piplines.Sockets.Unofficial#63](https://github.com/mgravell/Pipelines.Sockets.Unofficial/pull/63) by MarcGravell)
- Adds thread pool work item stats to exception messages to help diagnose contention (#1964 via NickCraver)
- Overhauls pub/sub implementation for correctness (#1947 via NickCraver)
- Fixes a race in subscribing right after connected
- Fixes a race in subscribing immediately before a publish
- Fixes subscription routing on clusters (spreading instead of choosing 1 node)
- More correctly reconnects subscriptions on connection failures, including to other endpoints

## 2.2.88

Expand Down
9 changes: 5 additions & 4 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,14 +1898,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
if (!first)
{
long subscriptionChanges = ValidateSubscriptions();
// Calling the sync path here because it's all fire and forget
long subscriptionChanges = EnsureSubscriptions(CommandFlags.FireAndForget);
if (subscriptionChanges == 0)
{
log?.WriteLine("No subscription changes necessary");
}
else
{
log?.WriteLine($"Subscriptions reconfigured: {subscriptionChanges}");
log?.WriteLine($"Subscriptions attempting reconnect: {subscriptionChanges}");
}
}
if (showStats)
Expand Down Expand Up @@ -2325,7 +2326,7 @@ internal void InitializeSentinel(LogProxy logProxy)
}
}
}
});
}, CommandFlags.FireAndForget);
}

// If we lose connection to a sentinel server,
Expand All @@ -2344,7 +2345,7 @@ internal void InitializeSentinel(LogProxy logProxy)
{
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
UpdateSentinelAddressList(messageParts[0]);
});
}, CommandFlags.FireAndForget);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,10 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to send.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,10 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to send.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
10 changes: 8 additions & 2 deletions src/StackExchange.Redis/Interfaces/ISubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public interface ISubscriber : IRedis
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to publish.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>the number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand All @@ -48,7 +51,10 @@ public interface ISubscriber : IRedis
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to publish.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>the number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
4 changes: 3 additions & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection)
{
SetWriteTime();
performance?.SetEnqueued();
performance?.SetEnqueued(connection?.BridgeCouldBeNull?.ConnectionType);
_enqueuedTo = connection;
if (connection == null)
{
Expand Down Expand Up @@ -735,6 +735,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i
}

public override string CommandAndKey => Command + " " + Channel;

public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
}

internal abstract class CommandKeyBase : Message
Expand Down
67 changes: 0 additions & 67 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
#if !NETCOREAPP
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
Expand Down Expand Up @@ -102,7 +101,6 @@ public enum State : byte
public void Dispose()
{
isDisposed = true;
ShutdownSubscriptionQueue();
using (var tmp = physical)
{
physical = null;
Expand Down Expand Up @@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters);
}

private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue;
private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread
SingleReader = true, // only one reader will be started per channel
SingleWriter = true, // writes will be synchronized, because order matters
};

private Channel<PendingSubscriptionState> GetSubscriptionQueue()
{
var queue = _subscriptionBackgroundQueue;
if (queue == null)
{
queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions);
var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);

if (existing != null) return existing; // we didn't win, but that's fine

// we won (_subqueue is now queue)
// this means we have a new channel without a reader; let's fix that!
Task.Run(() => ExecuteSubscriptionLoop());
}
return queue;
}

private void ShutdownSubscriptionQueue()
{
try
{
Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete();
}
catch { }
}

private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge
{
// note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this
var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it!
try
{
while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next))
{
try
{
// Treat these commands as background/handshake and do not allow queueing to backlog
if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success)
{
next.Abort();
}
}
catch (Exception ex)
{
next.Fail(ex);
}
}
}
catch (Exception ex)
{
Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType);
}
}

internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);

internal readonly struct BridgeStatus
{
/// <summary>
Expand Down
23 changes: 11 additions & 12 deletions src/StackExchange.Redis/Profiling/ProfiledCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static TimeSpan GetElapsedTime(long timestampDelta)
private long RequestSentTimeStamp;
private long ResponseReceivedTimeStamp;
private long CompletedTimeStamp;
private ConnectionType? ConnectionType;

private readonly ProfilingSession PushToWhenFinished;

Expand Down Expand Up @@ -86,7 +87,11 @@ public void SetMessage(Message msg)
MessageCreatedTimeStamp = msg.CreatedTimestamp;
}

public void SetEnqueued() => SetTimestamp(ref EnqueuedTimeStamp);
public void SetEnqueued(ConnectionType? connType)
{
SetTimestamp(ref EnqueuedTimeStamp);
ConnectionType = connType;
}

public void SetRequestSent() => SetTimestamp(ref RequestSentTimeStamp);

Expand Down Expand Up @@ -117,16 +122,10 @@ public void SetCompleted()
}

public override string ToString() =>
$@"EndPoint = {EndPoint}
Db = {Db}
Command = {Command}
CommandCreated = {CommandCreated:u}
CreationToEnqueued = {CreationToEnqueued}
EnqueuedToSending = {EnqueuedToSending}
SentToResponse = {SentToResponse}
ResponseToCompletion = {ResponseToCompletion}
ElapsedTime = {ElapsedTime}
Flags = {Flags}
RetransmissionOf = ({RetransmissionOf?.ToString() ?? "nothing"})";
$@"{Command} (DB: {Db}, Flags: {Flags})
EndPoint = {EndPoint} ({ConnectionType})
Created = {CommandCreated:HH:mm:ss.ffff}
ElapsedTime = {ElapsedTime.TotalMilliseconds} ms (CreationToEnqueued: {CreationToEnqueued.TotalMilliseconds} ms, EnqueuedToSending: {EnqueuedToSending.TotalMilliseconds} ms, SentToResponse: {SentToResponse.TotalMilliseconds} ms, ResponseToCompletion = {ResponseToCompletion.TotalMilliseconds} ms){(RetransmissionOf != null ? @"
RetransmissionOf = " + RetransmissionOf : "")}";
}
}
Loading