From 92aee5186299cc19bbc007bd502d9f07cb5a4fde Mon Sep 17 00:00:00 2001 From: Igor Clemente Date: Thu, 22 Feb 2024 18:18:21 -0300 Subject: [PATCH] Upgrade to MassTransit v8 --- .../CalculatePurchaseTotalActivity.cs | 15 ++-- .../Play.Trading.Service.csproj | 5 +- src/Play.Trading.Service/Startup.cs | 14 +--- .../StateMachines/PurchaseState.cs | 3 +- .../StateMachines/PurchaseStateMachine.cs | 84 +++++++++---------- 5 files changed, 54 insertions(+), 67 deletions(-) diff --git a/src/Play.Trading.Service/Activities/CalculatePurchaseTotalActivity.cs b/src/Play.Trading.Service/Activities/CalculatePurchaseTotalActivity.cs index e0cfd3c..eea3044 100644 --- a/src/Play.Trading.Service/Activities/CalculatePurchaseTotalActivity.cs +++ b/src/Play.Trading.Service/Activities/CalculatePurchaseTotalActivity.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; -using Automatonymous; -using GreenPipes; +using MassTransit; using Play.Common; using Play.Trading.Service.Contracts; using Play.Trading.Service.Entities; @@ -10,7 +9,7 @@ namespace Play.Trading.Service.Activities { - public class CalculatePurchaseTotalActivity : Activity + public class CalculatePurchaseTotalActivity : IStateMachineActivity { private readonly IRepository repository; @@ -25,9 +24,9 @@ public void Accept(StateMachineVisitor visitor) visitor.Visit(this); } - public async Task Execute(BehaviorContext context, Behavior next) + public async Task Execute(BehaviorContext context, IBehavior next) { - var message = context.Data; + var message = context.Message; var item = await repository.GetAsync(message.ItemId); @@ -36,13 +35,13 @@ public async Task Execute(BehaviorContext cont throw new UnknownItemException(message.ItemId); } - context.Instance.PurchaseTotal = item.Price * message.Quantity; - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.PurchaseTotal = item.Price * message.Quantity; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; await next.Execute(context).ConfigureAwait(false); } - public Task Faulted(BehaviorExceptionContext context, Behavior next) where TException : Exception + public Task Faulted(BehaviorExceptionContext context, IBehavior next) where TException : Exception { return next.Faulted(context); } diff --git a/src/Play.Trading.Service/Play.Trading.Service.csproj b/src/Play.Trading.Service/Play.Trading.Service.csproj index 35405cb..c7e1388 100644 --- a/src/Play.Trading.Service/Play.Trading.Service.csproj +++ b/src/Play.Trading.Service/Play.Trading.Service.csproj @@ -5,10 +5,9 @@ - - + - + diff --git a/src/Play.Trading.Service/Startup.cs b/src/Play.Trading.Service/Startup.cs index 1b87fff..71b0156 100644 --- a/src/Play.Trading.Service/Startup.cs +++ b/src/Play.Trading.Service/Startup.cs @@ -1,19 +1,14 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Threading.Tasks; -using GreenPipes; +using System.Text.Json; +using System.Text.Json.Serialization; using MassTransit; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.HttpsPolicy; -using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Microsoft.OpenApi.Models; using Play.Common.HealthChecks; using Play.Common.Identity; @@ -58,7 +53,7 @@ public void ConfigureServices(IServiceCollection services) { options.SuppressAsyncSuffixInActionNames = false; }) - .AddJsonOptions(options => options.JsonSerializerOptions.IgnoreNullValues = true); + .AddJsonOptions(options => options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull); services.AddSwaggerGen(c => { @@ -138,9 +133,6 @@ public void AddMassTransit(IServiceCollection services) EndpointConvention.Map(new Uri(queueSettings.GrantItemsQueueAddress)); EndpointConvention.Map(new Uri(queueSettings.DebitGilQueueAddress)); EndpointConvention.Map(new Uri(queueSettings.SubtractItemsQueueAddress)); - - services.AddMassTransitHostedService(); - services.AddGenericRequestClient(); } } } diff --git a/src/Play.Trading.Service/StateMachines/PurchaseState.cs b/src/Play.Trading.Service/StateMachines/PurchaseState.cs index 0385952..848f63c 100644 --- a/src/Play.Trading.Service/StateMachines/PurchaseState.cs +++ b/src/Play.Trading.Service/StateMachines/PurchaseState.cs @@ -1,6 +1,5 @@ using System; -using Automatonymous; -using MassTransit.Saga; +using MassTransit; namespace Play.Trading.Service.StateMachines { diff --git a/src/Play.Trading.Service/StateMachines/PurchaseStateMachine.cs b/src/Play.Trading.Service/StateMachines/PurchaseStateMachine.cs index d39c764..6ad683f 100644 --- a/src/Play.Trading.Service/StateMachines/PurchaseStateMachine.cs +++ b/src/Play.Trading.Service/StateMachines/PurchaseStateMachine.cs @@ -1,6 +1,4 @@ using System; -using System.Threading; -using Automatonymous; using DnsClient.Internal; using MassTransit; using Microsoft.Extensions.Logging; @@ -55,38 +53,38 @@ private void ConfigureInitialState() When(PurchaseRequested) .Then(context => { - context.Instance.UserId = context.Data.UserId; - context.Instance.ItemId = context.Data.ItemId; - context.Instance.Quantity = context.Data.Quantity; - context.Instance.Received = DateTimeOffset.UtcNow; - context.Instance.LastUpdated = context.Instance.Received; + context.Saga.UserId = context.Message.UserId; + context.Saga.ItemId = context.Message.ItemId; + context.Saga.Quantity = context.Message.Quantity; + context.Saga.Received = DateTimeOffset.UtcNow; + context.Saga.LastUpdated = context.Saga.Received; logger.LogInformation( "Calculating total price for purchase with CorrelationId {CorrelationId}...", - context.Instance.CorrelationId + context.Saga.CorrelationId ); }) .Activity(x => x.OfType()) .Send(context => new GrantItems( - context.Instance.UserId, - context.Instance.ItemId, - context.Instance.Quantity, - context.Instance.CorrelationId + context.Saga.UserId, + context.Saga.ItemId, + context.Saga.Quantity, + context.Saga.CorrelationId )) .TransitionTo(Accepted) .Catch(ex => ex. Then(context => { - context.Instance.ErrorMessage = context.Exception.Message; - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.ErrorMessage = context.Exception.Message; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; logger.LogError( context.Exception, "Could not calculate the total price of purchase with CorrelationId {CorrelationId}. Error: {ErrorMessage}", - context.Instance.CorrelationId, - context.Instance.ErrorMessage + context.Saga.CorrelationId, + context.Saga.ErrorMessage ); }) .TransitionTo(Faulted) - .ThenAsync(async context => await hub.SendStatusAsync(context.Instance)) + .ThenAsync(async context => await hub.SendStatusAsync(context.Saga)) ) ); } @@ -97,32 +95,32 @@ private void ConfigureAccepted() When(InventoryItemsGranted) .Then(context => { - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; logger.LogInformation( "Items of purchase with CorrelationId {CorrelationId} have been granted to user {UserId}. ", - context.Instance.CorrelationId, - context.Instance.UserId + context.Saga.CorrelationId, + context.Saga.UserId ); }) .Send(context => new DebitGil( - context.Instance.UserId, - context.Instance.PurchaseTotal.Value, - context.Instance.CorrelationId + context.Saga.UserId, + context.Saga.PurchaseTotal.Value, + context.Saga.CorrelationId )) .TransitionTo(ItemsGranted), When(GrantItemsFaulted) .Then(context => { - context.Instance.ErrorMessage = context.Data.Exceptions[0].Message; - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.ErrorMessage = context.Message.Exceptions[0].Message; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; logger.LogError( "Could not grant items for purchase with CorrelationId {CorrelationId}. Error: {ErrorMessage}", - context.Instance.CorrelationId, - context.Instance.ErrorMessage + context.Saga.CorrelationId, + context.Saga.ErrorMessage ); }) .TransitionTo(Faulted) - .ThenAsync(async context => await hub.SendStatusAsync(context.Instance)) + .ThenAsync(async context => await hub.SendStatusAsync(context.Saga)) ); } @@ -132,35 +130,35 @@ private void ConfigureItemsGranted() When(GilDebited) .Then(context => { - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; logger.LogInformation( "The total of price of purchase with CorrelationId {CorrelationId} has been debited from user {UserId}. Purchase complete.", - context.Instance.CorrelationId, - context.Instance.UserId + context.Saga.CorrelationId, + context.Saga.UserId ); }) .TransitionTo(Completed) - .ThenAsync(async context => await hub.SendStatusAsync(context.Instance)), + .ThenAsync(async context => await hub.SendStatusAsync(context.Saga)), When(DebitGilFaulted) .Send(context => new SubtractItems( - context.Instance.UserId, - context.Instance.ItemId, - context.Instance.Quantity, - context.Instance.CorrelationId + context.Saga.UserId, + context.Saga.ItemId, + context.Saga.Quantity, + context.Saga.CorrelationId )) .Then(context => { - context.Instance.ErrorMessage = context.Data.Exceptions[0].Message; - context.Instance.LastUpdated = DateTimeOffset.UtcNow; + context.Saga.ErrorMessage = context.Message.Exceptions[0].Message; + context.Saga.LastUpdated = DateTimeOffset.UtcNow; logger.LogError( "Could not debit the total price of purchase with CorrelationId {CorrelationId} from user {UserId}. Error: {ErrorMessage}.", - context.Instance.CorrelationId, - context.Instance.UserId, - context.Instance.ErrorMessage + context.Saga.CorrelationId, + context.Saga.UserId, + context.Saga.ErrorMessage ); }) .TransitionTo(Faulted) - .ThenAsync(async context => await hub.SendStatusAsync(context.Instance)) + .ThenAsync(async context => await hub.SendStatusAsync(context.Saga)) ); } @@ -168,7 +166,7 @@ private void ConfigureAny() { DuringAny( When(GetPurchaseState) - .Respond(x => x.Instance) + .Respond(x => x.Saga) ); }