v1.8.0-rc.3
Pre-releaseEnhancements
- Consumer connected only to the followers by @Gsantomaggio in #352
- Check the entity status during the close by @Gsantomaggio in #353
- Add Chunk info consumer side by @Gsantomaggio in #355
- Remove code duplication to handle the pool by @Gsantomaggio in #356
Full Changelog: v1.8.0-rc.2...v1.8.0-rc.3
What's new in 1.8
The 1.8 focus are:
- Multiple Consumers and Producers per connection
- Improve the reconnection for stream and super stream.
The high-level classes Consumer
and Producer
don't introduce breaking changes.
The RawSuperStream*
classes change the default behaviour. Please take a look at the section 3.
1. Multiple Consumers and Producers per connection
The RabbitMQ stream protocol supports multi-producers and multi-consumers per TCP Connection.
This version introduces the connection pool for Consumers and Producers.
There is a new ConnectionPoolConfig
setting:
new StreamSystemConfig {
ConnectionPoolConfig = new ConnectionPoolConfig()
{
ConsumersPerConnection = 10,
ProducersPerConnection = 10,
}
};
ConsumersPerConnection
== The number of consumers per connection min 1 max 200 default is 1
ProducersPerConnection
== The number of producers per connection min 1 max 200 default is 1
Each connection can handle different streams; see the image:
Performances
Sharing the same connection for multiple streams reduces the number of connections, but it could impact the performances:
- Consumer side. If one consumer is slow, it can also affect the other consumers
- Producer side: If all the producers are at full rate, it can reduce the performances
The proper parameter depends on your environment.
Tip
You can use different StreamSystemConfig
like:
configToReduceTheConnections = new StreamSystemConfig{
ConnectionPoolConfig = new ConnectionPoolConfig() {
ConsumersPerConnection = 50, // high value
ProducersPerConnection = 50, // high value
}
}
configToIncreaseThePerformances = new StreamSystemConfig{
ConnectionPoolConfig = new ConnectionPoolConfig() {
ConsumersPerConnection = 1, // low value
ProducersPerConnection = 1, // low value
}
}
There are many combinations from 1
to 200
.
2. Improve the reconnections
Handle streamNotAvailable
, Add disconnection Info: #343
Improve the super stream reconnection: #344
Increase the backoff strategy time: #345
Please follow this document If you want to know more about what happens during a broker restart.
The focus is to improve the reconnection during the cluster restart.
3. Raw Super stream events
Removed the auto-reconnect. The RawSuperStreamProducer
and RawSuperStreamConsumer
classes now expose two events:
**NOTE: If you are using these classes, the auto-reconnect is removed to be compliant with all the Raw*
classes. **
You should use Consumer
and Producer
unless for a specific use case.
For Raw* users:
- Super Stream: during the disconnection, it is possible to understand the disconnection cause and reconnect the stream like:
var consumer = await system.CreateSuperStreamConsumer(configuration);
var completed = new TaskCompletionSource<bool>();
configuration.ConnectionClosedHandler = async (reason, stream) =>
{
if (reason == ConnectionClosedReason.Unexpected)
{
await consumer.ReconnectPartition(
await system.StreamInfo(stream).ConfigureAwait(false)
);
completed.SetResult(true);
}
};
The same is true for the standard consumer.
- Metadata update
MetadataHandler = async update =>
{
await consumer.ReconnectPartition(
await system.StreamInfo(stream).ConfigureAwait(false));
}
4. Add events to Producer
and Consumer
classes
See: #349
See also: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient
where you can find an example of how to use StatusChanged
producerConfig.StatusChanged += (status) =>
{
var streamInfo = status.Partition is not null
? $" Partition {status.Partition} of super stream: {status.Stream}"
: $"Stream: {status.Stream}";
lp.LogInformation("Producer: {Id} - status changed from {From} to {To}. {Info}",
status.Identifier,
status.From,
status.To, streamInfo);
if (status.To == ReliableEntityStatus.Open)
{
publishEvent.Set();
}
else
{
publishEvent.Reset();
}};
5. Update Secret
See #342