-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added publisher confirms functionality to RabbitMQ #1906
added publisher confirms functionality to RabbitMQ #1906
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
End to end spec using AmpqSink
and AmpqFlow
with publisher confirmations enabled
@@ -12,11 +13,11 @@ namespace Akka.Streams.Amqp.RabbitMq | |||
public class AmqpFlowStage<TPassThrough> : GraphStageWithMaterializedValue<FlowShape<(OutgoingMessage, TPassThrough), TPassThrough>, Task> | |||
{ | |||
public static readonly Attributes DefaultAttributes = | |||
Attributes.CreateName("AmqpFlow").And(ActorAttributes.CreateDispatcher("akka.stream.default-blocking-io-dispatcher")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to run on the blocking I/O dispatcher
@@ -59,6 +60,13 @@ public Logic(AmqpFlowStage<TPassThrough> stage, TaskCompletionSource<Done> promi | |||
elem.Mandatory, | |||
elem.Properties, | |||
elem.Bytes.ToArray()); | |||
|
|||
if(_stage.Settings.WaitForConfirms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon publish, wait for confirms before we push downstream.
if (_stage.Settings.WaitForConfirms) | ||
{ | ||
// enable publisher confirms | ||
Channel.ConfirmSelect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need this in order to enable publisher confirmations on the channel
@@ -56,6 +58,14 @@ public AmqpSinkStageLogic(AmqpSinkStage stage, TaskCompletionSource<Done> promis | |||
elem.Mandatory, | |||
elem.Properties, | |||
elem.Bytes.ToArray()); | |||
|
|||
if(_stage.Settings.WaitForConfirms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mirrors what we did in the flow
if (_stage.Settings.WaitForConfirms) | ||
{ | ||
// enable publisher confirms | ||
Channel.ConfirmSelect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mirrors what we did in the flow
/// </remarks> | ||
public bool WaitForConfirms => WaitForConfirmsTimeout is not null; | ||
|
||
public TimeSpan? WaitForConfirmsTimeout { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Settings - when this gets set to a non-null
value that will enabled publisher confirms using the specified timeout.
Changes
Implements https://www.rabbitmq.com/docs/confirms#publisher-confirms functionality to
AmqpSink
andAmqpFlow
stages, for an extra dose of guaranteed reliability.Checklist