Skip to content

Commit

Permalink
event bus WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Oct 16, 2023
1 parent 7e04140 commit 93162cd
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 25 deletions.
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ namespace RabbitMQ.Stream.Client;
public abstract class Info
{
public string Stream { get; }
public string ClientProvidedName { get; }

protected Info(string stream)
protected Info(string stream, string clientProvidedName)
{
Stream = stream;
ClientProvidedName = clientProvidedName;
}
}
68 changes: 68 additions & 0 deletions RabbitMQ.Stream.Client/EventBus/StreamEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,73 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System.Collections.Generic;
using RabbitMQ.Stream.Client.Reliable;

namespace RabbitMQ.Stream.Client.EventBus;

public abstract class ClientEvent : IStreamEvent
{
protected ClientEvent(IDictionary<string, string> connectionProperties, ClientParameters parameters,
EventTypes eventType, EventSeverity eventSeverity)
{
ConnectionProperties = connectionProperties;
Parameters = parameters;
EventType = eventType;
EventSeverity = eventSeverity;
}

public EventTypes EventType { get; internal set; }
public EventSeverity EventSeverity { get; internal set; }

public ClientParameters Parameters { get; }
public IDictionary<string, string> ConnectionProperties { get; }
}

public class RawProducerConnected : ClientEvent
{
public RawProducerConnected(IDictionary<string, string> connectionProperties, ClientParameters parameters,
RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Connection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class RawProducerDisconnected : ClientEvent
{
public RawProducerDisconnected(IDictionary<string, string> connectionProperties,
ClientParameters parameters, RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Disconnection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class ReliableBaseReconnected : IStreamEvent
{
public ReliableBaseReconnected(bool isReconnection, EventSeverity eventSeverity)
{
IsReconnection = isReconnection;
EventSeverity = eventSeverity;
}

public bool IsReconnection { get; }
public EventTypes EventType { get; } = EventTypes.Reconnection;
public EventSeverity EventSeverity { get; }
}

public class ProducerReconnected : ReliableBaseReconnected
{
public Producer Instance { get; }

public ProducerReconnected(bool isReconnection, EventSeverity eventSeverity, Producer instance) : base(
isReconnection, eventSeverity)
{
Instance = instance;
}
}
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public ushort InitialCredits
public class ConsumerInfo : Info
{
public string Reference { get; }
public ConsumerInfo(string stream, string reference) : base(stream)

public ConsumerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client;

Expand Down Expand Up @@ -103,6 +104,8 @@ public record IProducerConfig : INamedEntity
/// Filter enables the chunk filter feature.
/// </summary>
public ProducerFilter Filter { get; set; } = null;

public IEventBus Events { get; set; }
}

/// <summary>
Expand All @@ -112,7 +115,7 @@ public class ProducerInfo : Info
{
public string Reference { get; }

public ProducerInfo(string stream, string reference) : base(stream)
public ProducerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
33 changes: 30 additions & 3 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent
RabbitMQ.Stream.Client.EventBus.ClientEvent.ClientEvent(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.EventBus.EventTypes eventType, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ClientEvent.Parameters.get -> RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Error = 2 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Info = 0 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
Expand All @@ -41,6 +47,20 @@ RabbitMQ.Stream.Client.EventBus.IEventBus.Subscribe<T>(System.Func<T, System.Thr
RabbitMQ.Stream.Client.EventBus.IStreamEvent
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ProducerReconnected
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.Instance.get -> RabbitMQ.Stream.Client.Reliable.Producer
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.ProducerReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity, RabbitMQ.Stream.Client.Reliable.Producer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerConnected
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.RawProducerConnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.RawProducerDisconnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.IsReconnection.get -> bool
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.ReliableBaseReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.Publish<T>(T v) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.StreamEventsBus() -> void
Expand Down Expand Up @@ -78,9 +98,12 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
RabbitMQ.Stream.Client.Info.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.Info.Info(string stream, string clientProvidedName) -> void
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.IProducerConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.IProducerConfig.Events.set -> void
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -93,7 +116,7 @@ RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
Expand Down Expand Up @@ -128,6 +151,10 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.get -> System.Func<bool, RabbitMQ.Stream.Client.EventBus.EventSeverity, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.set -> void
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
Info = new ConsumerInfo(_config.Stream, _config.Reference);
Info = new ConsumerInfo(_config.Stream, _config.Reference,config.ClientProvidedName);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down
11 changes: 9 additions & 2 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -22,7 +23,7 @@ public struct Confirmation

public string Stream { get; set; }
}

public record RawProducerConfig : IProducerConfig
{
public string Stream { get; }
Expand Down Expand Up @@ -71,6 +72,8 @@ public static async Task<IProducer> Create(
.ConfigureAwait(false);
var producer = new RawProducer((Client)client, config, logger);
await producer.Init().ConfigureAwait(false);
config.Events?.Publish(new RawProducerConnected(client.ConnectionProperties,
client.Parameters, producer));

return producer;
}
Expand All @@ -79,7 +82,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
{
_client = client;
_config = config;
Info = new ProducerInfo(_config.Stream, _config.Reference);
Info = new ProducerInfo(_config.Stream, _config.Reference,_config.ClientProvidedName);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand All @@ -102,10 +105,14 @@ private async Task Init()
_client.ConnectionClosed += async reason =>
{
await Close().ConfigureAwait(false);
_config.Events?.Publish(new RawProducerDisconnected(_client.ConnectionProperties,
_client.Parameters, this));
if (_config.ConnectionClosedHandler != null)
{
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
}

Check failure on line 114 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on ubuntu-latest

Avoid multiple blank lines

Check failure on line 114 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on ubuntu-latest

Avoid multiple blank lines

Check failure on line 114 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on windows-latest

Avoid multiple blank lines

Check failure on line 114 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on windows-latest

Avoid multiple blank lines
};

Check failure on line 116 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on ubuntu-latest

Consecutive braces must not have blank line between them

Check failure on line 116 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on ubuntu-latest

Consecutive braces must not have blank line between them

Check failure on line 116 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on windows-latest

Consecutive braces must not have blank line between them

Check failure on line 116 in RabbitMQ.Stream.Client/RawProducer.cs

View workflow job for this annotation

GitHub Actions / call-build-test / build/test on windows-latest

Consecutive braces must not have blank line between them

if (_config.MetadataHandler != null)
Expand Down
6 changes: 3 additions & 3 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private RawSuperStreamConsumer(
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
Info = new ConsumerInfo(_config.SuperStream, _config.Reference, _config.ClientProvidedName);

StartConsumers().Wait(CancellationToken.None);
}
Expand Down Expand Up @@ -144,7 +144,7 @@ await Task.Run(async () =>
_config.Reference,
update.Stream
);
var x = await _config.Client.QueryMetadata(new[] { update.Stream }).ConfigureAwait(false);
var x = await _config.Client.QueryMetadata(new[] {update.Stream}).ConfigureAwait(false);
x.StreamInfos.TryGetValue(update.Stream, out var streamInfo);
_streamInfos.Add(update.Stream, streamInfo);
await GetConsumer(update.Stream).ConfigureAwait(false);
Expand All @@ -158,7 +158,7 @@ await Task.Run(async () =>
private async Task<IConsumer> InitConsumer(string stream)
{
var c = await RawConsumer.Create(
_clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName },
_clientParameters with {ClientProvidedName = _clientParameters.ClientProvidedName},
FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false);
_logger?.LogDebug("Consumer {ConsumerReference} created for Stream {StreamIdentifier}", _config.Reference,
stream);
Expand Down
21 changes: 13 additions & 8 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private RawSuperStreamProducer(
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new ProducerInfo(config.SuperStream, config.Reference);
Info = new ProducerInfo(config.SuperStream, config.Reference, config.ClientProvidedName);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand All @@ -88,6 +88,7 @@ private RawProducerConfig FromStreamConfig(string stream)
Reference = _config.Reference,
MaxInFlight = _config.MaxInFlight,
Filter = _config.Filter,
Events = _config.Events,
ConnectionClosedHandler = s =>
{
// In case of connection closed, we need to remove the producer from the list
Expand Down Expand Up @@ -129,7 +130,11 @@ private RawProducerConfig FromStreamConfig(string stream)
// The producer is created on demand when a message is sent to a stream
private async Task<IProducer> InitProducer(string stream)
{
var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger)
var c = _clientParameters with
{
ClientProvidedName = _clientParameters.ClientProvidedName + "#" + stream
};
var p = await RawProducer.Create(c, FromStreamConfig(stream), _streamInfos[stream], _logger)
.ConfigureAwait(false);
_logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference,
stream);
Expand Down Expand Up @@ -160,7 +165,7 @@ private async Task<IProducer> GetProducerForMessage(Message message)

// we should always have a route
// but in case of stream KEY the routing could not exist
if (routes is not { Count: > 0 })
if (routes is not {Count: > 0})
{
throw new RouteNotFoundException("No route found for the message to any stream");
}
Expand Down Expand Up @@ -191,7 +196,7 @@ public async ValueTask Send(List<(ulong, Message)> messages)
}
else
{
aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) }));
aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)}));
}
}

Expand All @@ -217,7 +222,7 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
}
else
{
aggregate.Add((p, new List<Message>() { subMessage }));
aggregate.Add((p, new List<Message>() {subMessage}));
}
}

Expand Down Expand Up @@ -352,7 +357,7 @@ public Task<List<string>> Route(Message message, List<string> partitions)
var key = _routingKeyExtractor(message);
var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key));
var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count;
var r = new List<string>() { partitions[(int)index] };
var r = new List<string>() {partitions[(int)index]};
return Task.FromResult(r);
}

Expand Down Expand Up @@ -385,8 +390,8 @@ public async Task<List<string>> Route(Message message, List<string> partitions)
var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false);
_cacheStream[key] = c.Streams;
return (from resultStream in c.Streams
where partitions.Contains(resultStream)
select new List<string>() { resultStream }).FirstOrDefault();
where partitions.Contains(resultStream)
select new List<string>() {resultStream}).FirstOrDefault();
}

public KeyRoutingStrategy(Func<Message, string> routingKeyExtractor,
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference,consumerConfig.ClientProvidedName);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down
9 changes: 8 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down Expand Up @@ -138,8 +139,14 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
producerConfig.TimeoutMessageAfter,
producerConfig.MaxInFlight
);
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference);
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference, producerConfig.ClientProvidedName);
_logger = logger ?? NullLogger<Producer>.Instance;
OnReconnected = (isReconnection, eventSeverity) =>
{
producerConfig.Events?.Publish(new ProducerReconnected(isReconnection, eventSeverity, this));
return Task.CompletedTask;
};
}

/// <summary>
Expand Down
Loading

0 comments on commit 93162cd

Please sign in to comment.