Skip to content

Commit

Permalink
Add lock super stream send
Browse files Browse the repository at this point in the history
Add the SuperStream interface

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 18, 2024
1 parent 8d3ff1b commit 6e5b582
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 36 deletions.
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

namespace RabbitMQ.Stream.Client;

public interface ISuperStreamConsumer : IConsumer
{
public Task ReconnectPartition(StreamInfo streamInfo);
}

public interface IConsumer : IClosable
{
public Task StoreOffset(ulong offset);
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace RabbitMQ.Stream.Client;

public interface ISuperStreamProducer : IProducer
{
public Task ReconnectPartition(StreamInfo streamInfo);
}
// <summary>
// Producer interface for sending messages to a stream.
// There are different types of producers:
Expand Down Expand Up @@ -83,7 +87,6 @@ public record ProducerFilter

public record IProducerConfig : EntityCommonConfig, INamedEntity
{

public string Reference { get; set; }
public int MaxInFlight { get; set; } = 1_000;
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";
Expand Down
4 changes: 0 additions & 4 deletions RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,7 @@ RabbitMQ.Stream.Client.StreamSystem
RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.CreateRawConsumer(RabbitMQ.Stream.Client.RawConsumerConfig rawConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
RabbitMQ.Stream.Client.StreamSystem.CreateRawProducer(RabbitMQ.Stream.Client.RawProducerConfig rawProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool
RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
Expand Down Expand Up @@ -773,8 +771,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader<byte> reader, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IProducer
static RabbitMQ.Stream.Client.Reliable.Consumer.Create(RabbitMQ.Stream.Client.Reliable.ConsumerConfig consumerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Consumer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Consumer>
static RabbitMQ.Stream.Client.Reliable.Producer.Create(RabbitMQ.Stream.Client.Reliable.ProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Producer>
static RabbitMQ.Stream.Client.StreamCompressionCodecs.GetCompressionCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRouting
RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.Broker metaInfoBroker, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.ISuperStreamConsumer
RabbitMQ.Stream.Client.ISuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ISuperStreamProducer
RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.KeyRoutingStrategy
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand Down Expand Up @@ -239,6 +243,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
Expand All @@ -255,6 +261,8 @@ static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, Sy
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace RabbitMQ.Stream.Client;

public class RawSuperStreamConsumer : IConsumer, IDisposable
public class RawSuperStreamConsumer : ISuperStreamConsumer, IDisposable
{
// ConcurrentDictionary because the consumer can be closed from another thread
// The send operations will check if the producer exists and if not it will be created
Expand All @@ -36,7 +36,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable
/// <param name="clientParameters"></param>
/// <param name="logger"></param>
/// <returns></returns>
public static IConsumer Create(
public static ISuperStreamConsumer Create(
RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig,
IDictionary<string, StreamInfo> streamInfos,
ClientParameters clientParameters,
Expand Down Expand Up @@ -129,11 +129,11 @@ private RawConsumerConfig FromStreamConfig(string stream)

private async Task<IConsumer> InitConsumer(string stream)
{
var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index})
var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index })
.First(i => i.Item == stream).Index;

var c = await RawConsumer.Create(
_clientParameters with {ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}"},
_clientParameters with { ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}" },
FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false);
_logger?.LogDebug("Super stream consumer {ConsumerReference} created for Stream {StreamIdentifier}", c.Info,
stream);
Expand Down
48 changes: 32 additions & 16 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace RabbitMQ.Stream.Client;
/// When a message is sent to a stream, the producer will be selected based on the stream name and the partition key.
/// SuperStreamProducer uses lazy initialization for the producers, when it starts there are no producers until the first message is sent.
/// </summary>
public class RawSuperStreamProducer : IProducer, IDisposable
public class RawSuperStreamProducer : ISuperStreamProducer, IDisposable
{
private bool _disposed;

Expand All @@ -46,7 +46,7 @@ public class RawSuperStreamProducer : IProducer, IDisposable
private readonly ILogger _logger;
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

public static IProducer Create(
public static ISuperStreamProducer Create(
RawSuperStreamProducerConfig rawSuperStreamProducerConfig,
IDictionary<string, StreamInfo> streamInfos,
ClientParameters clientParameters,
Expand Down Expand Up @@ -129,10 +129,10 @@ private RawProducerConfig FromStreamConfig(string stream)
// The producer is created on demand when a message is sent to a stream
private async Task<IProducer> InitProducer(string stream)
{
var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index })
var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index})
.First(i => i.Item == stream).Index;
var p = await RawProducer.Create(
_clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" },
_clientParameters with {ClientProvidedName = $"{_config.ClientProvidedName}_{index}"},
FromStreamConfig(stream),
_streamInfos[stream],
_logger)
Expand Down Expand Up @@ -190,7 +190,7 @@ private async Task<IProducer> GetProducerForMessage(Message message)

// we should always have a route
// but in case of stream KEY the routing could not exist
if (routes is not { Count: > 0 })
if (routes is not {Count: > 0})
{
throw new RouteNotFoundException("No route found for the message to any stream");
}
Expand Down Expand Up @@ -239,13 +239,21 @@ public async ValueTask Send(List<(ulong, Message)> messages)
}
else
{
aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) }));
aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)}));
}
}

foreach (var (producer, list) in aggregate)
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var (producer, list) in aggregate)
{
await producer.Send(list).ConfigureAwait(false);
}
}
finally
{
await producer.Send(list).ConfigureAwait(false);
_semaphoreSlim.Release();
}
}

Expand All @@ -266,15 +274,23 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
}
else
{
aggregate.Add((p, new List<Message>() { subMessage }));
aggregate.Add((p, new List<Message>() {subMessage}));
}
}

// Here we send the messages to the right producer
// sub aggregate is a list of messages that have to be sent to the same producer
foreach (var (producer, messages) in aggregate)
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
// Here we send the messages to the right producer
// sub aggregate is a list of messages that have to be sent to the same producer
foreach (var (producer, messages) in aggregate)
{
await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false);
}
}
finally
{
await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false);
_semaphoreSlim.Release();
}
}

Expand Down Expand Up @@ -403,7 +419,7 @@ public Task<List<string>> Route(Message message, List<string> partitions)
var key = _routingKeyExtractor(message);
var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key));
var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count;
var r = new List<string>() { partitions[(int)index] };
var r = new List<string>() {partitions[(int)index]};
return Task.FromResult(r);
}

Expand Down Expand Up @@ -436,8 +452,8 @@ public async Task<List<string>> Route(Message message, List<string> partitions)
var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false);
_cacheStream[key] = c.Streams;
return (from resultStream in c.Streams
where partitions.Contains(resultStream)
select new List<string>() { resultStream }).FirstOrDefault();
where partitions.Contains(resultStream)
select new List<string>() {resultStream}).FirstOrDefault();
}

public KeyRoutingStrategy(Func<Message, string> routingKeyExtractor,
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static void CheckLeader(StreamInfo metaStreamInfo)
}
}

public async Task<IProducer> CreateRawSuperStreamProducer(
public async Task<ISuperStreamProducer> CreateRawSuperStreamProducer(
RawSuperStreamProducerConfig rawSuperStreamProducerConfig, ILogger logger = null)
{
await MayBeReconnectLocator().ConfigureAwait(false);
Expand Down Expand Up @@ -222,7 +222,7 @@ public async Task<string[]> QueryPartition(string superStream)
return partitions.Streams;
}

public async Task<IConsumer> CreateSuperStreamConsumer(
public async Task<ISuperStreamConsumer> CreateSuperStreamConsumer(
RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig,
ILogger logger = null)
{
Expand Down
Loading

0 comments on commit 6e5b582

Please sign in to comment.