diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index c29c9f61d..5f9e08d79 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -104,6 +104,14 @@ static void Main(string[] args) case "Signal": instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(SignalOrchestration), instanceId, null).Result; break; + case "SignalAndRaise": + if (options.Parameters == null || options.Parameters.Length != 1) + { + throw new ArgumentException("parameters"); + } + + instance = taskHubClient.CreateOrchestrationInstanceWithRaisedEventAsync(typeof(SignalOrchestration), instanceId, null, options.Signal, options.Parameters[0]).Result; + break; case "Replat": instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId, new MigrateOrchestrationData() { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result; diff --git a/samples/DurableTask.Samples/Signal/SignalOrchestration.cs b/samples/DurableTask.Samples/Signal/SignalOrchestration.cs index 60e434502..c9053764c 100644 --- a/samples/DurableTask.Samples/Signal/SignalOrchestration.cs +++ b/samples/DurableTask.Samples/Signal/SignalOrchestration.cs @@ -23,7 +23,7 @@ public class SignalOrchestration : TaskOrchestration public override async Task RunTask(OrchestrationContext context, string input) { string user = await WaitForSignal(); - string greeting = await context.ScheduleTask("DurableTaskSamples.Greetings.SendGreetingTask", string.Empty, user); + string greeting = await context.ScheduleTask("DurableTask.Samples.Greetings.SendGreetingTask", string.Empty, user); return greeting; } diff --git a/src/DurableTask.Framework/IOrchestrationServiceClient.cs b/src/DurableTask.Framework/IOrchestrationServiceClient.cs index 43027be0a..ea4df4517 100644 --- a/src/DurableTask.Framework/IOrchestrationServiceClient.cs +++ b/src/DurableTask.Framework/IOrchestrationServiceClient.cs @@ -37,6 +37,13 @@ public interface IOrchestrationServiceClient /// Task SendTaskOrchestrationMessageAsync(TaskMessage message); + /// + /// Send a new set of messages for an orchestration + /// + /// Messages to send + /// + Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages); + /// /// Wait for an orchestration to reach any terminal state within the given timeout /// diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 5475f0357..90c0c359a 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -222,13 +222,13 @@ public async Task CreateAsync(bool recreateInstanceStore) NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); await Task.WhenAll( - this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount), - this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount) + this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes), + this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes) ); if (InstanceStore != null) { - await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount); + await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes); await InstanceStore.InitializeStoreAsync(recreateInstanceStore); } } @@ -241,12 +241,12 @@ public async Task CreateIfNotExistsAsync() NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); await Task.WhenAll( - SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount), - SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount) + SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes), + SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes) ); if (InstanceStore != null) { - await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount); + await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes); await InstanceStore.InitializeStoreAsync(false); } } @@ -902,7 +902,10 @@ public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem) { var message = GetAndDeleteBrokeredMessageForWorkItem(workItem); TraceHelper.Trace(TraceEventType.Information, $"Abandoning message {workItem?.Id}"); - return message?.AbandonAsync(); + + return message == null + ? Task.FromResult(null) + : message.AbandonAsync(); } /// @@ -969,12 +972,41 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage) } /// - /// Send an orchestration message + /// Sends an orchestration message /// /// The task message to be sent for the orchestration - public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) + public Task SendTaskOrchestrationMessageAsync(TaskMessage message) { - BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( + return SendTaskOrchestrationMessageBatchAsync(message); + } + + /// + /// Sends a set of orchestration messages + /// + /// The task messages to be sent for the orchestration + public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages) + { + if (messages.Length == 0) + { + return; + } + + var tasks = new Task[messages.Length]; + for (int i = 0; i < messages.Length; i++) + { + tasks[i] = GetBrokeredMessageAsync(messages[i]); + } + + var brokeredMessages = await Task.WhenAll(tasks); + + MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false); + await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false); + await sender.CloseAsync().ConfigureAwait(false); + } + + async Task GetBrokeredMessageAsync(TaskMessage message) + { + var brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( message, Settings.MessageCompressionSettings, Settings.MessageSettings, @@ -987,12 +1019,10 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) var executionStartedEvent = message.Event as ExecutionStartedEvent; if (executionStartedEvent != null) { - brokeredMessage.MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"); + brokeredMessage.MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"; } - MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false); - await sender.SendAsync(brokeredMessage).ConfigureAwait(false); - await sender.CloseAsync().ConfigureAwait(false); + return brokeredMessage; } /// @@ -1471,13 +1501,14 @@ async Task SafeCreateQueueAsync( string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { await Utils.ExecuteWithRetries(async () => { try { - await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount); + await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes); } catch (MessagingEntityAlreadyExistsException) { @@ -1491,25 +1522,35 @@ async Task SafeDeleteAndCreateQueueAsync( string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { await SafeDeleteQueueAsync(namespaceManager, path); - await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount); + await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes); } + static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L }; + async Task CreateQueueAsync( NamespaceManager namespaceManager, string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { + if (!ValidQueueSizes.Contains(maxSizeInMegabytes)) + { + throw new ArgumentException($"The specified value {maxSizeInMegabytes} is invalid for the maximum queue size in megabytes.\r\nIt must be one of the following values:\r\n{string.Join(";", ValidQueueSizes)}", nameof(maxSizeInMegabytes)); + } + var description = new QueueDescription(path) { RequiresSession = requiresSessions, MaxDeliveryCount = maxDeliveryCount, RequiresDuplicateDetection = requiresDuplicateDetection, - DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours) + DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours), + MaxSizeInMegabytes = maxSizeInMegabytes }; await namespaceManager.CreateQueueAsync(description); diff --git a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs index dea64c422..709d1bc4a 100644 --- a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs +++ b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs @@ -60,6 +60,11 @@ public ServiceBusOrchestrationServiceSettings() /// public int MaxTrackingDeliveryCount { get; set; } + /// + /// Maximum queue size, in megabytes, for the service bus queues + /// + public long MaxQueueSizeInMegabytes { get; set; } = 1024L; + /// /// Gets the message prefetch count /// diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index 416a9fac1..97b51d765 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -57,10 +57,14 @@ public TaskHubClient(IOrchestrationServiceClient serviceClient) /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(Type orchestrationType, object input) { - return CreateOrchestrationInstanceAsync( + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( NameVersionHelper.GetDefaultName(orchestrationType), NameVersionHelper.GetDefaultVersion(orchestrationType), - input); + null, + input, + null, + null, + null); } /// @@ -75,11 +79,14 @@ public Task CreateOrchestrationInstanceAsync( string instanceId, object input) { - return CreateOrchestrationInstanceAsync( + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( NameVersionHelper.GetDefaultName(orchestrationType), NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, - input); + input, + null, + null, + null); } /// @@ -91,8 +98,7 @@ public Task CreateOrchestrationInstanceAsync( /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(string name, string version, object input) { - string instanceId = Guid.NewGuid().ToString("N"); - return CreateOrchestrationInstanceAsync(name, version, instanceId, input); + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, null, input, null, null, null); } /// @@ -105,7 +111,7 @@ public Task CreateOrchestrationInstanceAsync(string name, /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(string name, string version, string instanceId, object input) { - return CreateOrchestrationInstanceAsync(name, version, instanceId, input, null); + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, instanceId, input, null, null, null); } /// @@ -117,42 +123,257 @@ public Task CreateOrchestrationInstanceAsync(string name, /// Input parameter to the specified TaskOrchestration /// Dictionary of key/value tags associated with this instance /// OrchestrationInstance that represents the orchestration that was created - public async Task CreateOrchestrationInstanceAsync( + public Task CreateOrchestrationInstanceAsync( string name, string version, string instanceId, object input, IDictionary tags) { - if (string.IsNullOrWhiteSpace(instanceId)) + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, instanceId, input, tags, null, null); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Type that derives from TaskOrchestration + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + Type orchestrationType, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + NameVersionHelper.GetDefaultName(orchestrationType), + NameVersionHelper.GetDefaultVersion(orchestrationType), + null, + orchestrationInput, + null, + eventName, + eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Type that derives from TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + Type orchestrationType, + string instanceId, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + NameVersionHelper.GetDefaultName(orchestrationType), + NameVersionHelper.GetDefaultVersion(orchestrationType), + instanceId, + orchestrationInput, + null, + eventName, + eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the event + /// Data for the event + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + null, + null, + null, eventName, eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + null, orchestrationInput, + null, + eventName, + eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + orchestrationInput, null, + eventName, + eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Dictionary of key/value tags associated with this instance + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + object orchestrationInput, + IDictionary orchestrationTags, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + orchestrationInput, + orchestrationTags, + eventName, + eventData); + } + + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the orchestration as specified by the ObjectCreator + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Name of the event + /// Data for the event + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + null, + null, eventName, eventData); + } + + async Task InternalCreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string orchestrationInstanceId, + object orchestrationInput, + IDictionary orchestrationTags, + string eventName, + object eventData) + { + if (string.IsNullOrWhiteSpace(orchestrationInstanceId)) { - instanceId = Guid.NewGuid().ToString("N"); + orchestrationInstanceId = Guid.NewGuid().ToString("N"); } var orchestrationInstance = new OrchestrationInstance { - InstanceId = instanceId, + InstanceId = orchestrationInstanceId, ExecutionId = Guid.NewGuid().ToString("N"), }; - string serializedInput = defaultConverter.Serialize(input); - - var startedEvent = new ExecutionStartedEvent(-1, serializedInput) + string serializedOrchestrationData = defaultConverter.Serialize(orchestrationInput); + var startedEvent = new ExecutionStartedEvent(-1, serializedOrchestrationData) { - Tags = tags, - Name = name, - Version = version, + Tags = orchestrationTags, + Name = orchestrationName, + Version = orchestrationVersion, OrchestrationInstance = orchestrationInstance }; - var taskMessage = new TaskMessage + var taskMessages = new List { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + } }; - await this.serviceClient.CreateTaskOrchestrationAsync(taskMessage); + if (eventData != null) + { + string serializedEventData = defaultConverter.Serialize(eventData); + taskMessages.Add(new TaskMessage + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = orchestrationInstanceId, + + // to ensure that the event gets raised on the running + // orchestration instance, null the execution id + // so that it will find out which execution + // it should use for processing + ExecutionId = null + }, + Event = new EventRaisedEvent(-1, serializedEventData) {Name = eventName} + }); + } + await this.serviceClient.SendTaskOrchestrationMessageBatchAsync(taskMessages.ToArray()); return orchestrationInstance; } diff --git a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs index cd5d46197..f0f0aca16 100644 --- a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs +++ b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs @@ -191,7 +191,16 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage) public Task SendTaskOrchestrationMessageAsync(TaskMessage message) { - this.orchestratorQueue.SendMessage(message); + return SendTaskOrchestrationMessageBatchAsync(message); + } + + public Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages) + { + foreach (var message in messages) + { + this.orchestratorQueue.SendMessage(message); + } + return Task.FromResult(null); } diff --git a/tools/DurableTask.props b/tools/DurableTask.props index 6e03921ea..99bb25064 100644 --- a/tools/DurableTask.props +++ b/tools/DurableTask.props @@ -6,6 +6,7 @@ False False True + True