Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API to allow starting a new conversation #5693

Merged
merged 2 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using System.Threading.Tasks;

namespace NServiceBus.AcceptanceTests.Core.Causation
{
public class When_starting_new_conversation_inside_message_handler : NServiceBusAcceptanceTest
{
const string GeneratedConversationId = "Generated Conversation Id";
const string UserDefinedConverstionId = "User Definied Conversation Id";

[Test]
public async Task With_specified_conversation_id()
{
var context = await Scenario.Define<NewConversationScenario>(ctx => ctx.PropsedConversationId = UserDefinedConverstionId)
.WithEndpoint<Sender>(b => b.When(session =>
{
return session.Send(new AnyMessage());
}))
.WithEndpoint<Receiver>()
.Done(ctx => ctx.MessageHandled)
.Run();

Assert.That(context.NewConversationId, Is.EqualTo(UserDefinedConverstionId), "ConversationId should be set to the user defined value.");
Assert.That(context.PreviousConversationId, Is.EqualTo(context.OriginalConversationId), "PreviousConversationId header should be set to the original conversation id.");
}

[Test]
public async Task Without_specified_conversation_id()
{
var context = await Scenario.Define<NewConversationScenario>()
.WithEndpoint<Sender>(b => b.When(session =>
{
return session.Send(new AnyMessage());
}))
.WithEndpoint<Receiver>()
.Done(ctx => ctx.MessageHandled)
.Run();

Assert.That(context.NewConversationId, Is.EqualTo(GeneratedConversationId), "ConversationId should be generated.");
Assert.That(context.NewConversationId, Is.Not.EqualTo(context.OriginalConversationId), "ConversationId should not be equal to the original conversation id.");
Assert.That(context.PreviousConversationId, Is.EqualTo(context.OriginalConversationId), "PreviousConversationId header should be set to the original conversation id.");
}

class NewConversationScenario : ScenarioContext
{
public string PropsedConversationId { get; set; }
public bool MessageHandled { get; set; }
public string OriginalConversationId { get; set; }
public string NewConversationId { get; set; }
public string PreviousConversationId { get; set; }
}

class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(
c => c.ConfigureTransport()
.Routing()
.RouteToEndpoint(typeof(AnyMessage), typeof(Receiver)));
}

class AnyResponseMessageHandler : IHandleMessages<AnyResponseMessage>
{
NewConversationScenario scenario;

public AnyResponseMessageHandler(NewConversationScenario scenario)
{
this.scenario = scenario;
}

public Task Handle(AnyResponseMessage message, IMessageHandlerContext context)
{
if(context.MessageHeaders.TryGetValue(Headers.ConversationId ,out var conversationId))
{
scenario.NewConversationId = conversationId;
}
if(context.MessageHeaders.TryGetValue(Headers.PreviousConversationId, out var previousConversationId))
{
scenario.PreviousConversationId = previousConversationId;
}
scenario.MessageHandled = true;
return Task.FromResult(0);
}
}
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(
c =>
{
c.ConfigureTransport()
.Routing()
.RouteToEndpoint(typeof(AnyResponseMessage), typeof(Sender));

c.CustomConversationIdStrategy(ctx => ConversationId.Custom(GeneratedConversationId));
});
}

class AnyMessageHandler : IHandleMessages<AnyMessage>
{
NewConversationScenario scenario;

public AnyMessageHandler(NewConversationScenario scenario)
{
this.scenario = scenario;
}

public Task Handle(AnyMessage message, IMessageHandlerContext context)
{
scenario.OriginalConversationId = context.MessageHeaders[Headers.ConversationId];

var sendOptions = new SendOptions();
sendOptions.StartNewConversation(scenario.PropsedConversationId);

return context.Send(new AnyResponseMessage(), sendOptions);
}
}
}

public class AnyMessage : IMessage { }
public class AnyResponseMessage : IMessage { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using System;
using System.Threading.Tasks;

namespace NServiceBus.AcceptanceTests.Core.Causation
{
public class When_starting_new_conversation_outside_message_handler : NServiceBusAcceptanceTest
{
const string NewConversionId = "User Defined Conversation Id";
const string GeneratedConversationId = "Generated Conversation Id";

[Test]
public async Task With_specified_conversation_id()
{
var context = await Scenario.Define<NewConversationScenario>()
.WithEndpoint<NewConversationEndpoint>(b => b.When(async session =>
{
var sendOptions = new SendOptions();
sendOptions.RouteToThisEndpoint();
sendOptions.StartNewConversation(NewConversionId);

await session.Send(new AnyMessage(), sendOptions);
}))
.Done(ctx => ctx.MessageHandled)
.Run();

Assert.That(context.ConversationId, Is.EqualTo(NewConversionId), "ConversationId should be set to configured user defined value.");
Assert.That(context.PreviousConversationId, Is.Null, "Previous ConversationId should not be set when handling a message outside of a message handler.");
}

[Test]
public async Task Without_specified_conversation_id()
{
var context = await Scenario.Define<NewConversationScenario>()
.WithEndpoint<NewConversationEndpoint>(b => b.When(async session =>
{
var sendOptions = new SendOptions();
sendOptions.RouteToThisEndpoint();
sendOptions.StartNewConversation();

await session.Send(new AnyMessage(), sendOptions);
}))
.Done(ctx => ctx.MessageHandled)
.Run();

Assert.That(context.ConversationId, Is.EqualTo(GeneratedConversationId), "ConversationId should be generated.");
Assert.That(context.ConversationId, Is.Not.EqualTo(context.PreviousConversationId), "ConversationId should not match the previous conversationId.");
Assert.That(context.PreviousConversationId, Is.Null, "Previous ConversationId should not be set when handling a message outside of a message handler.");
}

[Test]
public async Task Cannot_set_value_for_header_directly()
{
var overrideConversationId = "Some conversationid";

var context = await Scenario.Define<NewConversationScenario>()
.WithEndpoint<NewConversationEndpoint>(b => b.When(async (session, ctx) =>
{
var sendOptions = new SendOptions();
sendOptions.RouteToThisEndpoint();
sendOptions.StartNewConversation();
sendOptions.SetHeader(Headers.ConversationId, overrideConversationId);

try
{
await session.Send(new AnyMessage(), sendOptions);
}
catch(Exception ex)
{
ctx.ExceptionMessage = ex.Message;
ctx.ExceptionThrown = true;
}
}))
.Done(ctx => ctx.ExceptionThrown)
.Run();

var expectedExceptionMessage = $"Cannot set the NServiceBus.ConversationId header to '{overrideConversationId}' as StartNewConversation() was called.";
Assert.That(context.ExceptionThrown, Is.True, "Exception should be thrown when trying to directly set conversationId");
Assert.That(context.ExceptionMessage, Is.EqualTo(expectedExceptionMessage));
}

public class AnyMessage : IMessage
{

}

class NewConversationEndpoint : EndpointConfigurationBuilder
{
public NewConversationEndpoint()
{
EndpointSetup<DefaultServer>(c => c.CustomConversationIdStrategy(ctx => ConversationId.Custom(GeneratedConversationId)));
}

class AnyMessageHandler : IHandleMessages<AnyMessage>
{
private NewConversationScenario scenario;

public AnyMessageHandler(NewConversationScenario scenario)
{
this.scenario = scenario;
}

public Task Handle(AnyMessage message, IMessageHandlerContext context)
{
if (context.MessageHeaders.TryGetValue(Headers.ConversationId, out var conversationId))
{
scenario.ConversationId = conversationId;
}

if (context.MessageHeaders.TryGetValue(Headers.PreviousConversationId, out var previousConversationId))
{
scenario.PreviousConversationId = previousConversationId;
}

scenario.MessageHandled = true;

return Task.FromResult(0);
}
}
}

class NewConversationScenario : ScenarioContext
{
public bool ExceptionThrown { get; set; }
public string ExceptionMessage { get; set; }
public bool MessageHandled { get; set; }
public string ConversationId { get; set; }
public string PreviousConversationId { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ namespace NServiceBus
public System.Collections.Generic.IReadOnlyDictionary<string, string> Headers { get; }
public NServiceBus.Pipeline.OutgoingLogicalMessage Message { get; }
}
public class static ConversationRoutingExtensions
{
public static void StartNewConversation(this NServiceBus.SendOptions sendOptions, string conversationId = null) { }
}
[System.ObsoleteAttribute("Setting a custom correlation ID is no longer supported. Will be removed in versio" +
"n 8.0.0.", true)]
public class static CorrelationContextExtensions
Expand Down Expand Up @@ -405,6 +409,7 @@ namespace NServiceBus
public const string OriginatingSagaId = "NServiceBus.OriginatingSagaId";
public const string OriginatingSagaType = "NServiceBus.OriginatingSagaType";
public const string OriginatingSite = "NServiceBus.OriginatingSite";
public const string PreviousConversationId = "NServiceBus.PreviousConversationId";
public const string ProcessingEnded = "NServiceBus.ProcessingEnded";
public const string ProcessingEndpoint = "NServiceBus.ProcessingEndpoint";
public const string ProcessingMachine = "NServiceBus.ProcessingMachine";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ namespace NServiceBus
public System.Collections.Generic.IReadOnlyDictionary<string, string> Headers { get; }
public NServiceBus.Pipeline.OutgoingLogicalMessage Message { get; }
}
public class static ConversationRoutingExtensions
{
public static void StartNewConversation(this NServiceBus.SendOptions sendOptions, string conversationId = null) { }
}
[System.ObsoleteAttribute("Setting a custom correlation ID is no longer supported. Will be removed in versio" +
"n 8.0.0.", true)]
public class static CorrelationContextExtensions
Expand Down Expand Up @@ -405,6 +409,7 @@ namespace NServiceBus
public const string OriginatingSagaId = "NServiceBus.OriginatingSagaId";
public const string OriginatingSagaType = "NServiceBus.OriginatingSagaType";
public const string OriginatingSite = "NServiceBus.OriginatingSite";
public const string PreviousConversationId = "NServiceBus.PreviousConversationId";
public const string ProcessingEnded = "NServiceBus.ProcessingEnded";
public const string ProcessingEndpoint = "NServiceBus.ProcessingEndpoint";
public const string ProcessingMachine = "NServiceBus.ProcessingMachine";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void When_user_defined_conversation_id_would_overwrite_incoming_conversat

var exception = Assert.ThrowsAsync<Exception>(() => behavior.Invoke(context, ctx => TaskEx.CompletedTask));

Assert.AreEqual($"Cannot set the {Headers.ConversationId} header to '{userDefinedConversationId}' as it cannot override the incoming header value ('{incomingConversationId}').", exception.Message);
Assert.AreEqual($"Cannot set the {Headers.ConversationId} header to '{userDefinedConversationId}' as it cannot override the incoming header value ('{incomingConversationId}'). To start a new conversation use sendOptions.StartNewConversation().", exception.Message);
}

[Test]
Expand Down
21 changes: 19 additions & 2 deletions src/NServiceBus.Core/Causation/AttachCausationHeadersBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,29 @@ static void SetRelatedToHeader(IOutgoingLogicalMessageContext context, IncomingM

void SetConversationIdHeader(IOutgoingLogicalMessageContext context, IncomingMessage incomingMessage)
{
var conversationIdFromCurrentMessageContext = default(string);
var hasIncomingMessageConversationId = incomingMessage != null && incomingMessage.Headers.TryGetValue(Headers.ConversationId, out conversationIdFromCurrentMessageContext);
var hasUserDefinedConversationId = context.Headers.TryGetValue(Headers.ConversationId, out var userDefinedConversationId);

if (incomingMessage != null && incomingMessage.Headers.TryGetValue(Headers.ConversationId, out var conversationIdFromCurrentMessageContext))
if (context.Extensions.TryGet<string>(NewConversationId, out var newConversationId))
{
if(hasUserDefinedConversationId)
{
throw new Exception($"Cannot set the {Headers.ConversationId} header to '{userDefinedConversationId}' as StartNewConversation() was called.");
}
if (hasIncomingMessageConversationId)
{
context.Headers[Headers.PreviousConversationId] = conversationIdFromCurrentMessageContext;
}
context.Headers[Headers.ConversationId] = newConversationId ?? conversationIdStrategy(context);
return;
}

if (hasIncomingMessageConversationId)
{
if (hasUserDefinedConversationId)
{
throw new Exception($"Cannot set the {Headers.ConversationId} header to '{userDefinedConversationId}' as it cannot override the incoming header value ('{conversationIdFromCurrentMessageContext}').");
throw new Exception($"Cannot set the {Headers.ConversationId} header to '{userDefinedConversationId}' as it cannot override the incoming header value ('{conversationIdFromCurrentMessageContext}'). To start a new conversation use sendOptions.StartNewConversation().");
}

context.Headers[Headers.ConversationId] = conversationIdFromCurrentMessageContext;
Expand All @@ -56,5 +72,6 @@ void SetConversationIdHeader(IOutgoingLogicalMessageContext context, IncomingMes
}

readonly Func<IOutgoingLogicalMessageContext, string> conversationIdStrategy;
public const string NewConversationId = "NewConversationId";
}
}
18 changes: 18 additions & 0 deletions src/NServiceBus.Core/Causation/ConversationRoutingExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NServiceBus
{
/// <summary>
/// Gives users control of message conversations.
/// </summary>
public static class ConversationRoutingExtensions
{
/// <summary>
/// Start a new messaging conversation.
/// </summary>
/// <param name="sendOptions">The option being extended.</param>
/// <param name="conversationId">The id for the new conversation. If not provided, an id will be generated.</param>
public static void StartNewConversation(this SendOptions sendOptions, string conversationId = null)
{
sendOptions.Context.Set(AttachCausationHeadersBehavior.NewConversationId, conversationId);
}
}
}
5 changes: 5 additions & 0 deletions src/NServiceBus.Core/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ public static class Headers
/// </summary>
public const string ConversationId = "NServiceBus.ConversationId";

/// <summary>
/// The id of the previous message conversation that triggered this message.
/// </summary>
public const string PreviousConversationId = "NServiceBus.PreviousConversationId";

/// <summary>
/// The intent of the current message.
/// </summary>
Expand Down