Skip to content

Commit

Permalink
Merge pull request #6101 from Particular/backport-sc-notifications
Browse files Browse the repository at this point in the history
Backport ServiceControl Retry Notifications
  • Loading branch information
tmasternak authored Jul 8, 2021
2 parents 7e2835b + fe74da7 commit 68e66e1
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co
context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),

context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
namespace NServiceBus.AcceptanceTests.Recoverability
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using Extensibility;
using Features;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NUnit.Framework;
using Transport;
using Unicast.Transport;

public class When_retrying_control_message_from_error_queue : NServiceBusAcceptanceTest
{
static readonly string RetryId = Guid.NewGuid().ToString("D");

[Test]
public async Task Should_confirm_successful_processing()
{
Requires.MessageDrivenPubSub(); //required for subscription control message support

var context = await Scenario.Define<Context>()
.WithEndpoint<ProcessingEndpoint>()
.WithEndpoint<RetryAckSpy>()
.Done(c => c.ConfirmedRetryId != null)
.Run();

Assert.AreEqual(RetryId, context.ConfirmedRetryId);
var processingTime = DateTimeExtensions.ToUtcDateTime(context.RetryProcessingTimestamp);
Assert.That(processingTime, Is.EqualTo(DateTime.UtcNow).Within(TimeSpan.FromMinutes(1)));
}

class Context : ScenarioContext
{
public string ConfirmedRetryId { get; set; }
public string RetryProcessingTimestamp { get; set; }
}

class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<ControlMessageFeature>();
});

class ControlMessageFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(s =>
new ControlMessageSender(s.Build<IDispatchMessages>()));
}
}

class ControlMessageSender : FeatureStartupTask
{
IDispatchMessages dispatcher;

public ControlMessageSender(IDispatchMessages dispatcher)
{
this.dispatcher = dispatcher;
}

protected override async Task OnStart(IMessageSession session)
{
var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
// set necessary subscription control message headers
controlMessage.Headers.Add(Headers.SubscriptionMessageType, typeof(object).AssemblyQualifiedName);
controlMessage.Headers.Add(Headers.ReplyToAddress, "TestSubscriberAddress");
// set SC headers
controlMessage.Headers.Add("ServiceControl.Retry.UniqueMessageId", RetryId);
controlMessage.Headers.Add("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy)));
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))));
await dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), new ContextBag());
}

protected override Task OnStop(IMessageSession session) => Task.FromResult(0);
}
}

class RetryAckSpy : EndpointConfigurationBuilder
{
public RetryAckSpy() => EndpointSetup<DefaultServer>((e, r) => e.Pipeline.Register(
new ControlMessageBehavior(r.ScenarioContext as Context),
"Checks for confirmation control message"));

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
Context testContext;

public ControlMessageBehavior(Context testContext)
{
this.testContext = testContext;
}

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"];
testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"];
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
namespace NServiceBus.AcceptanceTests.Recoverability
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_retrying_message_from_error_queue : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_confirm_successful_processing()
{
var retryId = Guid.NewGuid().ToString("D");

var context = await Scenario.Define<Context>()
.WithEndpoint<ProcessingEndpoint>(e => e
.When(s =>
{
var sendOptions = new SendOptions();
sendOptions.RouteToThisEndpoint();
// set SC retry header information
sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", retryId);
sendOptions.SetHeader("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy)));
return s.Send(new FailedMessage(), sendOptions);
}))
.WithEndpoint<RetryAckSpy>()
.WithEndpoint<AuditSpy>()
.Done(c => c.ConfirmedRetryId != null && c.AuditHeaders != null)
.Run();

Assert.IsTrue(context.MessageProcessed);
Assert.AreEqual(retryId, context.ConfirmedRetryId);
var processingTime = DateTimeExtensions.ToUtcDateTime(context.RetryProcessingTimestamp);
Assert.That(processingTime, Is.EqualTo(DateTime.UtcNow).Within(TimeSpan.FromMinutes(1)));
Assert.IsTrue(context.AuditHeaders.ContainsKey("ServiceControl.Retry.AcknowledgementSent"));
}

class Context : ScenarioContext
{
public string ConfirmedRetryId { get; set; }
public string RetryProcessingTimestamp { get; set; }
public bool MessageProcessed { get; set; }
public IReadOnlyDictionary<string, string> AuditHeaders { get; set; }
}

class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.AuditProcessedMessagesTo<AuditSpy>();
});
}

class FailedMessageHandler : IHandleMessages<FailedMessage>
{
Context testContext;

public FailedMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(FailedMessage message, IMessageHandlerContext context)
{
testContext.MessageProcessed = true;
return Task.FromResult(0);
}
}
}

class RetryAckSpy : EndpointConfigurationBuilder
{
public RetryAckSpy() => EndpointSetup<DefaultServer>((e, r) => e.Pipeline.Register(
new ControlMessageBehavior(r.ScenarioContext as Context),
"Checks for confirmation control message"));

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
Context testContext;

public ControlMessageBehavior(Context testContext)
{
this.testContext = testContext;
}

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"];
testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"];
}
}
}

class AuditSpy : EndpointConfigurationBuilder
{
public AuditSpy() => EndpointSetup<DefaultServer>();

class FailedMessageHandler : IHandleMessages<FailedMessage>
{
Context testContext;

public FailedMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(FailedMessage message, IMessageHandlerContext context)
{
testContext.AuditHeaders = context.MessageHeaders;
return Task.FromResult(0);
}
}

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
Context testContext;

public ControlMessageBehavior(Context testContext)
{
this.testContext = testContext;
}

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"];
testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"];
}
}
}

public class FailedMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,10 @@ namespace NServiceBus.Features
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
public class PlatformRetryNotifications : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
[System.ObsoleteAttribute("Performance counters have been released as a separate package: NServiceBus.Metric" +
"s.PerformanceCounters. Will be removed in version 8.0.0.", true)]
public class ReceiveStatisticsPerformanceCounters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,10 @@ namespace NServiceBus.Features
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
public class PlatformRetryNotifications : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
[System.ObsoleteAttribute("Performance counters have been released as a separate package: NServiceBus.Metric" +
"s.PerformanceCounters. Will be removed in version 8.0.0.", true)]
public class ReceiveStatisticsPerformanceCounters
Expand Down
Loading

0 comments on commit 68e66e1

Please sign in to comment.