Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Oct 17, 2023
1 parent 93162cd commit 3ed749c
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 14 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
Info = new ConsumerInfo(_config.Stream, _config.Reference,config.ClientProvidedName);
Info = new ConsumerInfo(_config.Stream, _config.Reference, config.ClientProvidedName);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down
4 changes: 1 addition & 3 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
{
_client = client;
_config = config;
Info = new ProducerInfo(_config.Stream, _config.Reference,_config.ClientProvidedName);
Info = new ProducerInfo(_config.Stream, _config.Reference, _config.ClientProvidedName);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand Down Expand Up @@ -111,8 +111,6 @@ private async Task Init()
{
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
}
};

if (_config.MetadataHandler != null)
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ await Task.Run(async () =>
_config.Reference,
update.Stream
);
var x = await _config.Client.QueryMetadata(new[] {update.Stream}).ConfigureAwait(false);
var x = await _config.Client.QueryMetadata(new[] { update.Stream }).ConfigureAwait(false);
x.StreamInfos.TryGetValue(update.Stream, out var streamInfo);
_streamInfos.Add(update.Stream, streamInfo);
await GetConsumer(update.Stream).ConfigureAwait(false);
Expand All @@ -158,7 +158,7 @@ await Task.Run(async () =>
private async Task<IConsumer> InitConsumer(string stream)
{
var c = await RawConsumer.Create(
_clientParameters with {ClientProvidedName = _clientParameters.ClientProvidedName},
_clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName },
FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false);
_logger?.LogDebug("Consumer {ConsumerReference} created for Stream {StreamIdentifier}", _config.Reference,
stream);
Expand Down
12 changes: 6 additions & 6 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private async Task<IProducer> GetProducerForMessage(Message message)

// we should always have a route
// but in case of stream KEY the routing could not exist
if (routes is not {Count: > 0})
if (routes is not { Count: > 0 })
{
throw new RouteNotFoundException("No route found for the message to any stream");
}
Expand Down Expand Up @@ -196,7 +196,7 @@ public async ValueTask Send(List<(ulong, Message)> messages)
}
else
{
aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)}));
aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) }));
}
}

Expand All @@ -222,7 +222,7 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
}
else
{
aggregate.Add((p, new List<Message>() {subMessage}));
aggregate.Add((p, new List<Message>() { subMessage }));
}
}

Expand Down Expand Up @@ -357,7 +357,7 @@ public Task<List<string>> Route(Message message, List<string> partitions)
var key = _routingKeyExtractor(message);
var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key));
var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count;
var r = new List<string>() {partitions[(int)index]};
var r = new List<string>() { partitions[(int)index] };
return Task.FromResult(r);
}

Expand Down Expand Up @@ -390,8 +390,8 @@ public async Task<List<string>> Route(Message message, List<string> partitions)
var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false);
_cacheStream[key] = c.Streams;
return (from resultStream in c.Streams
where partitions.Contains(resultStream)
select new List<string>() {resultStream}).FirstOrDefault();
where partitions.Contains(resultStream)
select new List<string>() { resultStream }).FirstOrDefault();
}

public KeyRoutingStrategy(Func<Message, string> routingKeyExtractor,
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference,consumerConfig.ClientProvidedName);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference, consumerConfig.ClientProvidedName);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
_logger = logger ?? NullLogger<Producer>.Instance;
OnReconnected = (isReconnection, eventSeverity) =>
{
producerConfig.Events?.Publish(new ProducerReconnected(isReconnection, eventSeverity, this));
return Task.CompletedTask;
};
Expand Down

0 comments on commit 3ed749c

Please sign in to comment.