Skip to content

Commit

Permalink
Pub/Sub fixes for subscribe/re-subscribe (#1947)
Browse files Browse the repository at this point in the history
We're working on pub/sub - breaking it out explicitly from #1912. This relates to several issues and in general handling resubscriptions on reconnect.

Issues: #1110, #1586, #1830 #1835

There are a few things in play we're investigating:
- [x] Subscription heartbeat not going over the subscription connection (due to `PING` and `GetBridge`)
- [x] Subscriptions not reconnecting at all (or potentially doing to and unsubscribing according to some issues)
- [x] Subscriptions always going to a single cluster node (due to `default(RedisKey)`)

Overall this set of changes:
- Completely restructures how RedisSubscriber works
  - No more `PendingSubscriptionState` (`Subscription` has the needed bits to reconnect)
  - Cleaner method topology (in `RedisSubscriber`, rather than `Subscriber`, `RedisSubscriber`, and `ConnectionMultiplexer`)
    - By placing these on `RedisSubscriber`, we can cleanly use `ExecuteSync/Async` bits, get proper profiling, etc.
  - Proper sync/async split (rather than `Wait()` in sync paths)
- Changes how subscriptions work
  - The `Subscription` object is added to the `ConnectionMultiplexer` tracking immediately, but the command itself actually goes to the server and back (unless FireAndForget) before returning for proper ordering like other commands.
  - No more `Task.Run()` loop - we now ensure reconnects as part of the handshake
  - Subscriptions are marked as not having a server the moment a disconnect is fired
    - Question: Should we have a throttle around this for massive numbers of connections, or async it?
- Changes how connecting works
  - The connection completion handler will now fire when the _second_ bridge/connection completes, this means we won't have `interactive` connected but `subscription` in an unknown state - both are connected before we fire the handler meaning the moment we come back from connect, subscriptions are in business.
- Moves to a `ConcurrentDictionary` since we only need limited locking around this and we only have it once per multiplexer.
  - TODO: This needs eyes, we could shift it - implementation changed along the way where this isn't a critical detail
- Fixes the `TrackSubscriptionsProcessor` - this was never setting the result but didn't notice in 8 years because downstream code never cared.
  - Note: each `Subscription` has a processor instance (with minimal state) because when the subscription command comes back _then_ we need to decide if it successfully registered (if it didn't, we need to maintain it has no successful server)
- `ConnectionMultiplexer` grew a `DefaultSubscriber` for running some commands without lots of method duplication, e.g. ensuring servers are connected.
- Overrides `GetHashSlot` on `CommandChannelBase` with the new `RedisChannel`-based methods so that operates correctly

Not directly related changes which helped here:
- Better profiler helpers for tests and profiler logging in them
- Re-enables a few `PubSub` tests that were unreliable before...but correctly so.

TODO: I'd like to add a few more test scenarios here:
- [x] Simple Subscribe/Publish/await Until/check pattern to ensure back-to-back subscribe/publish works well
- [x] Cluster connection failure and subscriptions moving to another node

To consider:
- [x] Subscription await loop from EnsureSubscriptionsAsync and connection impact on large reconnect situations
   - In a reconnect case, this is background and only the nodes affected have any latency...but still.
- [ ] TODOs in code around variadic commands, e.g. re-subscribing with far fewer commands by using `SUBSCRIBE <key1> <key2>...`
   - In cluster, we'd have to batch per slot...or just go for the first available node
   - ...but if we go for the first available node, the semantics of `IsConnected` are slightly off in the not connected (`CurrentServer is null`) case, because we'd say we're connected to where it _would_ go even though that'd be non-deterministic without hashslot batching. I think this is really minor and shouldn't affect our decision.
- [x] `ConcurrentDictionary` vs. returning to locks around a `Dictionary`
   - ...but if we have to lock on firing consumption of handlers anyway, concurrency overhead is probably a wash.
  • Loading branch information
NickCraver authored Feb 4, 2022
1 parent b81c5da commit d9b3c58
Show file tree
Hide file tree
Showing 26 changed files with 694 additions and 426 deletions.
1 change: 0 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ jobs:
with:
dotnet-version: |
3.1.x
5.0.x
6.0.x
- name: .NET Build
run: dotnet build Build.csproj -c Release /p:CI=true
Expand Down
5 changes: 5 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
- Moved tiebreaker fetching in connections into the handshake phase (streamline + simplification) (#1931 via NickCraver)
- Fixed potential disposed object usage around Arenas (pulling in [Piplines.Sockets.Unofficial#63](https://github.com/mgravell/Pipelines.Sockets.Unofficial/pull/63) by MarcGravell)
- Adds thread pool work item stats to exception messages to help diagnose contention (#1964 via NickCraver)
- Overhauls pub/sub implementation for correctness (#1947 via NickCraver)
- Fixes a race in subscribing right after connected
- Fixes a race in subscribing immediately before a publish
- Fixes subscription routing on clusters (spreading instead of choosing 1 node)
- More correctly reconnects subscriptions on connection failures, including to other endpoints

## 2.2.88

Expand Down
9 changes: 5 additions & 4 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,14 +1898,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
if (!first)
{
long subscriptionChanges = ValidateSubscriptions();
// Calling the sync path here because it's all fire and forget
long subscriptionChanges = EnsureSubscriptions(CommandFlags.FireAndForget);
if (subscriptionChanges == 0)
{
log?.WriteLine("No subscription changes necessary");
}
else
{
log?.WriteLine($"Subscriptions reconfigured: {subscriptionChanges}");
log?.WriteLine($"Subscriptions attempting reconnect: {subscriptionChanges}");
}
}
if (showStats)
Expand Down Expand Up @@ -2325,7 +2326,7 @@ internal void InitializeSentinel(LogProxy logProxy)
}
}
}
});
}, CommandFlags.FireAndForget);
}

// If we lose connection to a sentinel server,
Expand All @@ -2344,7 +2345,7 @@ internal void InitializeSentinel(LogProxy logProxy)
{
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
UpdateSentinelAddressList(messageParts[0]);
});
}, CommandFlags.FireAndForget);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,10 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to send.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,10 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to send.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
10 changes: 8 additions & 2 deletions src/StackExchange.Redis/Interfaces/ISubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public interface ISubscriber : IRedis
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to publish.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>the number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand All @@ -48,7 +51,10 @@ public interface ISubscriber : IRedis
/// <param name="channel">The channel to publish to.</param>
/// <param name="message">The message to publish.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>the number of clients that received the message.</returns>
/// <returns>
/// The number of clients that received the message *on the destination server*,
/// note that this doesn't mean much in a cluster as clients can get the message through other nodes.
/// </returns>
/// <remarks>https://redis.io/commands/publish</remarks>
Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None);

Expand Down
4 changes: 3 additions & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection)
{
SetWriteTime();
performance?.SetEnqueued();
performance?.SetEnqueued(connection?.BridgeCouldBeNull?.ConnectionType);
_enqueuedTo = connection;
if (connection == null)
{
Expand Down Expand Up @@ -735,6 +735,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i
}

public override string CommandAndKey => Command + " " + Channel;

public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
}

internal abstract class CommandKeyBase : Message
Expand Down
67 changes: 0 additions & 67 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
#if !NETCOREAPP
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
Expand Down Expand Up @@ -102,7 +101,6 @@ public enum State : byte
public void Dispose()
{
isDisposed = true;
ShutdownSubscriptionQueue();
using (var tmp = physical)
{
physical = null;
Expand Down Expand Up @@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters);
}

private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue;
private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread
SingleReader = true, // only one reader will be started per channel
SingleWriter = true, // writes will be synchronized, because order matters
};

private Channel<PendingSubscriptionState> GetSubscriptionQueue()
{
var queue = _subscriptionBackgroundQueue;
if (queue == null)
{
queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions);
var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);

if (existing != null) return existing; // we didn't win, but that's fine

// we won (_subqueue is now queue)
// this means we have a new channel without a reader; let's fix that!
Task.Run(() => ExecuteSubscriptionLoop());
}
return queue;
}

private void ShutdownSubscriptionQueue()
{
try
{
Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete();
}
catch { }
}

private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge
{
// note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this
var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it!
try
{
while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next))
{
try
{
// Treat these commands as background/handshake and do not allow queueing to backlog
if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success)
{
next.Abort();
}
}
catch (Exception ex)
{
next.Fail(ex);
}
}
}
catch (Exception ex)
{
Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType);
}
}

internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);

internal readonly struct BridgeStatus
{
/// <summary>
Expand Down
23 changes: 11 additions & 12 deletions src/StackExchange.Redis/Profiling/ProfiledCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static TimeSpan GetElapsedTime(long timestampDelta)
private long RequestSentTimeStamp;
private long ResponseReceivedTimeStamp;
private long CompletedTimeStamp;
private ConnectionType? ConnectionType;

private readonly ProfilingSession PushToWhenFinished;

Expand Down Expand Up @@ -86,7 +87,11 @@ public void SetMessage(Message msg)
MessageCreatedTimeStamp = msg.CreatedTimestamp;
}

public void SetEnqueued() => SetTimestamp(ref EnqueuedTimeStamp);
public void SetEnqueued(ConnectionType? connType)
{
SetTimestamp(ref EnqueuedTimeStamp);
ConnectionType = connType;
}

public void SetRequestSent() => SetTimestamp(ref RequestSentTimeStamp);

Expand Down Expand Up @@ -117,16 +122,10 @@ public void SetCompleted()
}

public override string ToString() =>
$@"EndPoint = {EndPoint}
Db = {Db}
Command = {Command}
CommandCreated = {CommandCreated:u}
CreationToEnqueued = {CreationToEnqueued}
EnqueuedToSending = {EnqueuedToSending}
SentToResponse = {SentToResponse}
ResponseToCompletion = {ResponseToCompletion}
ElapsedTime = {ElapsedTime}
Flags = {Flags}
RetransmissionOf = ({RetransmissionOf?.ToString() ?? "nothing"})";
$@"{Command} (DB: {Db}, Flags: {Flags})
EndPoint = {EndPoint} ({ConnectionType})
Created = {CommandCreated:HH:mm:ss.ffff}
ElapsedTime = {ElapsedTime.TotalMilliseconds} ms (CreationToEnqueued: {CreationToEnqueued.TotalMilliseconds} ms, EnqueuedToSending: {EnqueuedToSending.TotalMilliseconds} ms, SentToResponse: {SentToResponse.TotalMilliseconds} ms, ResponseToCompletion = {ResponseToCompletion.TotalMilliseconds} ms){(RetransmissionOf != null ? @"
RetransmissionOf = " + RetransmissionOf : "")}";
}
}
Loading

0 comments on commit d9b3c58

Please sign in to comment.