From 93b1c34d908db7d9d19f133e906b13b1e6de6449 Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Wed, 16 Nov 2016 17:03:02 -0800 Subject: [PATCH 1/8] Adding a method in the client that will both create an orchestration instance and send raise the event in a batch --- .../IOrchestrationServiceClient.cs | 4 +- .../ServiceBusOrchestrationService.cs | 42 ++++++++------ src/DurableTask.Framework/TaskHubClient.cs | 58 +++++++++++++++++++ .../Mocks/LocalOrchestrationService.cs | 8 ++- 4 files changed, 90 insertions(+), 22 deletions(-) diff --git a/src/DurableTask.Framework/IOrchestrationServiceClient.cs b/src/DurableTask.Framework/IOrchestrationServiceClient.cs index 43027be0a..e04da3012 100644 --- a/src/DurableTask.Framework/IOrchestrationServiceClient.cs +++ b/src/DurableTask.Framework/IOrchestrationServiceClient.cs @@ -33,9 +33,9 @@ public interface IOrchestrationServiceClient /// /// Send a new message for an orchestration /// - /// Message to send + /// List of messages to send /// - Task SendTaskOrchestrationMessageAsync(TaskMessage message); + Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages); /// /// Wait for an orchestration to reach any terminal state within the given timeout diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 5475f0357..28849d7b9 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -969,29 +969,35 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage) } /// - /// Send an orchestration message + /// Sends a set of orchestration messages /// - /// The task message to be sent for the orchestration - public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) + /// The task message to be sent for the orchestration + public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages) { - BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( - message, - Settings.MessageCompressionSettings, - Settings.MessageSettings, - message.OrchestrationInstance, - "SendTaskOrchestrationMessage", - this.BlobStore, - DateTime.MinValue); - - // Use duplicate detection of ExecutionStartedEvent by addin messageId - var executionStartedEvent = message.Event as ExecutionStartedEvent; - if (executionStartedEvent != null) - { - brokeredMessage.MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"); + var brokeredMessages = new BrokeredMessage[messages.Length]; + for (int i = 0; i < messages.Length; i++) + { + var message = messages[i]; + + brokeredMessages[i] = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( + message, + Settings.MessageCompressionSettings, + Settings.MessageSettings, + message.OrchestrationInstance, + "SendTaskOrchestrationMessage", + this.BlobStore, + DateTime.MinValue); + + // Use duplicate detection of ExecutionStartedEvent by addin messageId + var executionStartedEvent = message.Event as ExecutionStartedEvent; + if (executionStartedEvent != null) + { + brokeredMessages[i].MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"); + } } MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false); - await sender.SendAsync(brokeredMessage).ConfigureAwait(false); + await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false); await sender.CloseAsync().ConfigureAwait(false); } diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index 416a9fac1..e494a9ba4 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -156,6 +156,64 @@ public async Task CreateOrchestrationInstanceAsync( return orchestrationInstance; } + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the orchestration as specified by the ObjectCreator + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Name of the event + /// Data for the event + public async Task RaiseEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + string eventName, + object eventData) + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = instanceId, + ExecutionId = Guid.NewGuid().ToString("N"), + }; + + var startedEvent = new ExecutionStartedEvent(-1, null) + { + Tags = null, + Name = orchestrationName, + Version = orchestrationVersion, + OrchestrationInstance = orchestrationInstance + }; + + string serializedInput = defaultConverter.Serialize(eventData); + + var taskMessages = new[] + { + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }, + new TaskMessage + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = instanceId, + + // to ensure that the event gets raised on the running + // orchestration instance, null the execution id + // so that it will find out which execution + // it should use for processing + ExecutionId = null + }, + Event = new EventRaisedEvent(-1, serializedInput) {Name = eventName} + } + }; + + await this.serviceClient.SendTaskOrchestrationMessageAsync(taskMessages); + } + /// /// Raises an event in the specified orchestration instance, which eventually causes the OnEvent() method in the /// orchestration to fire. diff --git a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs index cd5d46197..10b5b4237 100644 --- a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs +++ b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs @@ -189,9 +189,13 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage) return Task.FromResult(null); } - public Task SendTaskOrchestrationMessageAsync(TaskMessage message) + public Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages) { - this.orchestratorQueue.SendMessage(message); + foreach (var message in messages) + { + this.orchestratorQueue.SendMessage(message); + } + return Task.FromResult(null); } From 8a1e8c0295b0738516417c28cb2d7c659472c005 Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Wed, 16 Nov 2016 17:07:10 -0800 Subject: [PATCH 2/8] Ensuring an instance id is set, not sending tasks if no messages --- src/DurableTask.Framework/ServiceBusOrchestrationService.cs | 5 +++++ src/DurableTask.Framework/TaskHubClient.cs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 28849d7b9..5c2ad96d1 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -974,6 +974,11 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage) /// The task message to be sent for the orchestration public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages) { + if (messages.Length == 0) + { + return; + } + var brokeredMessages = new BrokeredMessage[messages.Length]; for (int i = 0; i < messages.Length; i++) { diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index e494a9ba4..56aa383a0 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -172,6 +172,11 @@ public async Task RaiseEventAsync( string eventName, object eventData) { + if (string.IsNullOrWhiteSpace(instanceId)) + { + instanceId = Guid.NewGuid().ToString("N"); + } + var orchestrationInstance = new OrchestrationInstance { InstanceId = instanceId, From bae60f44a3fba6cf4b316e7206e4150e8cb79073 Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Thu, 17 Nov 2016 10:23:35 -0800 Subject: [PATCH 3/8] Adding ability to set the size of service bus queues at creation time --- .../ServiceBusOrchestrationService.cs | 37 ++++++++++++------- .../ServiceBusOrchestrationServiceSettings.cs | 5 +++ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 5c2ad96d1..9097d658b 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -222,13 +222,13 @@ public async Task CreateAsync(bool recreateInstanceStore) NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); await Task.WhenAll( - this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount), - this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount) + this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes), + this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes) ); if (InstanceStore != null) { - await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount); + await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes); await InstanceStore.InitializeStoreAsync(recreateInstanceStore); } } @@ -241,12 +241,12 @@ public async Task CreateIfNotExistsAsync() NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); await Task.WhenAll( - SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount), - SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount) + SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes), + SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes) ); if (InstanceStore != null) { - await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount); + await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes); await InstanceStore.InitializeStoreAsync(false); } } @@ -997,7 +997,7 @@ public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] message var executionStartedEvent = message.Event as ExecutionStartedEvent; if (executionStartedEvent != null) { - brokeredMessages[i].MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"); + brokeredMessages[i].MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"; } } @@ -1482,13 +1482,14 @@ async Task SafeCreateQueueAsync( string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { await Utils.ExecuteWithRetries(async () => { try { - await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount); + await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes); } catch (MessagingEntityAlreadyExistsException) { @@ -1502,25 +1503,35 @@ async Task SafeDeleteAndCreateQueueAsync( string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { await SafeDeleteQueueAsync(namespaceManager, path); - await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount); + await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes); } + private static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L }; + async Task CreateQueueAsync( NamespaceManager namespaceManager, string path, bool requiresSessions, bool requiresDuplicateDetection, - int maxDeliveryCount) + int maxDeliveryCount, + long maxSizeInMegabytes) { + if (!ValidQueueSizes.Contains(maxSizeInMegabytes)) + { + throw new ArgumentException($"The specified value {maxSizeInMegabytes} is invalid for the maximum queue size in megabytes.\r\nIt must be one of the following values:\r\n{string.Join(";", ValidQueueSizes)}", nameof(maxSizeInMegabytes)); + } + var description = new QueueDescription(path) { RequiresSession = requiresSessions, MaxDeliveryCount = maxDeliveryCount, RequiresDuplicateDetection = requiresDuplicateDetection, - DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours) + DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours), + MaxSizeInMegabytes = maxSizeInMegabytes }; await namespaceManager.CreateQueueAsync(description); diff --git a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs index dea64c422..93877d927 100644 --- a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs +++ b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs @@ -60,6 +60,11 @@ public ServiceBusOrchestrationServiceSettings() /// public int MaxTrackingDeliveryCount { get; set; } + /// + /// Maximum queue size, in megabytes, for the service bus queues + /// + public long MaxQueueSizeInMegabytes { get; set; } = 1024; + /// /// Gets the message prefetch count /// From f71d73cb0b9338a05b873d50dcfb103aae80033f Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Thu, 1 Dec 2016 09:43:02 -0800 Subject: [PATCH 4/8] Fixing exception when awaiting on a null Task --- src/DurableTask.Framework/ServiceBusOrchestrationService.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 9097d658b..3ef3ea5ed 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -902,7 +902,10 @@ public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem) { var message = GetAndDeleteBrokeredMessageForWorkItem(workItem); TraceHelper.Trace(TraceEventType.Information, $"Abandoning message {workItem?.Id}"); - return message?.AbandonAsync(); + + return message == null + ? Task.FromResult(null) + : message.AbandonAsync(); } /// From 7d039b2a592aac8c3bcb53629c4909a45942bdbf Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Thu, 1 Dec 2016 15:51:40 -0800 Subject: [PATCH 5/8] Resolving pull request comments. --- samples/DurableTask.Samples/Program.cs | 8 + .../Signal/SignalOrchestration.cs | 2 +- .../ServiceBusOrchestrationService.cs | 45 +-- .../ServiceBusOrchestrationServiceSettings.cs | 2 +- src/DurableTask.Framework/TaskHubClient.cs | 262 ++++++++++++++---- 5 files changed, 246 insertions(+), 73 deletions(-) diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index c29c9f61d..5f9e08d79 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -104,6 +104,14 @@ static void Main(string[] args) case "Signal": instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(SignalOrchestration), instanceId, null).Result; break; + case "SignalAndRaise": + if (options.Parameters == null || options.Parameters.Length != 1) + { + throw new ArgumentException("parameters"); + } + + instance = taskHubClient.CreateOrchestrationInstanceWithRaisedEventAsync(typeof(SignalOrchestration), instanceId, null, options.Signal, options.Parameters[0]).Result; + break; case "Replat": instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId, new MigrateOrchestrationData() { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result; diff --git a/samples/DurableTask.Samples/Signal/SignalOrchestration.cs b/samples/DurableTask.Samples/Signal/SignalOrchestration.cs index 60e434502..c9053764c 100644 --- a/samples/DurableTask.Samples/Signal/SignalOrchestration.cs +++ b/samples/DurableTask.Samples/Signal/SignalOrchestration.cs @@ -23,7 +23,7 @@ public class SignalOrchestration : TaskOrchestration public override async Task RunTask(OrchestrationContext context, string input) { string user = await WaitForSignal(); - string greeting = await context.ScheduleTask("DurableTaskSamples.Greetings.SendGreetingTask", string.Empty, user); + string greeting = await context.ScheduleTask("DurableTask.Samples.Greetings.SendGreetingTask", string.Empty, user); return greeting; } diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index 3ef3ea5ed..e746b78a7 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -982,33 +982,40 @@ public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] message return; } - var brokeredMessages = new BrokeredMessage[messages.Length]; + var tasks = new Task[messages.Length]; for (int i = 0; i < messages.Length; i++) { - var message = messages[i]; - - brokeredMessages[i] = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( - message, - Settings.MessageCompressionSettings, - Settings.MessageSettings, - message.OrchestrationInstance, - "SendTaskOrchestrationMessage", - this.BlobStore, - DateTime.MinValue); - - // Use duplicate detection of ExecutionStartedEvent by addin messageId - var executionStartedEvent = message.Event as ExecutionStartedEvent; - if (executionStartedEvent != null) - { - brokeredMessages[i].MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"; - } + tasks[i] = GetBrokeredMessageAsync(messages[i]); } + var brokeredMessages = await Task.WhenAll(tasks); + MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false); await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false); await sender.CloseAsync().ConfigureAwait(false); } + async Task GetBrokeredMessageAsync(TaskMessage message) + { + var brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( + message, + Settings.MessageCompressionSettings, + Settings.MessageSettings, + message.OrchestrationInstance, + "SendTaskOrchestrationMessage", + this.BlobStore, + DateTime.MinValue); + + // Use duplicate detection of ExecutionStartedEvent by addin messageId + var executionStartedEvent = message.Event as ExecutionStartedEvent; + if (executionStartedEvent != null) + { + brokeredMessage.MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}"; + } + + return brokeredMessage; + } + /// /// Force terminates an orchestration by sending a execution terminated event /// @@ -1513,7 +1520,7 @@ async Task SafeDeleteAndCreateQueueAsync( await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes); } - private static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L }; + static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L }; async Task CreateQueueAsync( NamespaceManager namespaceManager, diff --git a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs index 93877d927..709d1bc4a 100644 --- a/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs +++ b/src/DurableTask.Framework/Settings/ServiceBusOrchestrationServiceSettings.cs @@ -63,7 +63,7 @@ public ServiceBusOrchestrationServiceSettings() /// /// Maximum queue size, in megabytes, for the service bus queues /// - public long MaxQueueSizeInMegabytes { get; set; } = 1024; + public long MaxQueueSizeInMegabytes { get; set; } = 1024L; /// /// Gets the message prefetch count diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index 56aa383a0..a3300041a 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -57,10 +57,14 @@ public TaskHubClient(IOrchestrationServiceClient serviceClient) /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(Type orchestrationType, object input) { - return CreateOrchestrationInstanceAsync( + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( NameVersionHelper.GetDefaultName(orchestrationType), NameVersionHelper.GetDefaultVersion(orchestrationType), - input); + null, + input, + null, + null, + null); } /// @@ -75,11 +79,14 @@ public Task CreateOrchestrationInstanceAsync( string instanceId, object input) { - return CreateOrchestrationInstanceAsync( + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( NameVersionHelper.GetDefaultName(orchestrationType), NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, - input); + input, + null, + null, + null); } /// @@ -91,8 +98,7 @@ public Task CreateOrchestrationInstanceAsync( /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(string name, string version, object input) { - string instanceId = Guid.NewGuid().ToString("N"); - return CreateOrchestrationInstanceAsync(name, version, instanceId, input); + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, null, input, null, null, null); } /// @@ -105,7 +111,7 @@ public Task CreateOrchestrationInstanceAsync(string name, /// OrchestrationInstance that represents the orchestration that was created public Task CreateOrchestrationInstanceAsync(string name, string version, string instanceId, object input) { - return CreateOrchestrationInstanceAsync(name, version, instanceId, input, null); + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, instanceId, input, null, null, null); } /// @@ -117,43 +123,173 @@ public Task CreateOrchestrationInstanceAsync(string name, /// Input parameter to the specified TaskOrchestration /// Dictionary of key/value tags associated with this instance /// OrchestrationInstance that represents the orchestration that was created - public async Task CreateOrchestrationInstanceAsync( + public Task CreateOrchestrationInstanceAsync( string name, string version, string instanceId, object input, IDictionary tags) { - if (string.IsNullOrWhiteSpace(instanceId)) - { - instanceId = Guid.NewGuid().ToString("N"); - } + return InternalCreateOrchestrationInstanceWithRaisedEventAsync(name, version, instanceId, input, tags, null, null); + } - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = instanceId, - ExecutionId = Guid.NewGuid().ToString("N"), - }; + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Type that derives from TaskOrchestration + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + Type orchestrationType, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + NameVersionHelper.GetDefaultName(orchestrationType), + NameVersionHelper.GetDefaultVersion(orchestrationType), + null, + orchestrationInput, + null, + eventName, + eventData); + } - string serializedInput = defaultConverter.Serialize(input); + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Type that derives from TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + Type orchestrationType, + string instanceId, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + NameVersionHelper.GetDefaultName(orchestrationType), + NameVersionHelper.GetDefaultVersion(orchestrationType), + instanceId, + orchestrationInput, + null, + eventName, + eventData); + } - var startedEvent = new ExecutionStartedEvent(-1, serializedInput) - { - Tags = tags, - Name = name, - Version = version, - OrchestrationInstance = orchestrationInstance - }; + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the orchestration as specified by the ObjectCreator + /// Name of the event + /// Data for the event + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + null, + null, + null, eventName, eventData); + } - var taskMessage = new TaskMessage - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }; + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + null, orchestrationInput, + null, + eventName, + eventData); + } - await this.serviceClient.CreateTaskOrchestrationAsync(taskMessage); + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + object orchestrationInput, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + orchestrationInput, null, + eventName, + eventData); + } - return orchestrationInstance; + /// + /// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the + /// orchestration to fire. + /// + /// Name of the TaskOrchestration + /// Version of the TaskOrchestration + /// Instance id for the orchestration to be created, must be unique across the Task Hub + /// Input parameter to the specified TaskOrchestration + /// Dictionary of key/value tags associated with this instance + /// Name of the event + /// Data for the event + /// OrchestrationInstance that represents the orchestration that was created + public Task CreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string instanceId, + object orchestrationInput, + IDictionary orchestrationTags, + string eventName, + object eventData) + { + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + orchestrationInput, + orchestrationTags, + eventName, + eventData); } /// @@ -165,58 +301,80 @@ public async Task CreateOrchestrationInstanceAsync( /// Instance id for the orchestration to be created, must be unique across the Task Hub /// Name of the event /// Data for the event - public async Task RaiseEventAsync( + public Task CreateOrchestrationInstanceWithRaisedEventAsync( string orchestrationName, string orchestrationVersion, string instanceId, string eventName, object eventData) { - if (string.IsNullOrWhiteSpace(instanceId)) + return InternalCreateOrchestrationInstanceWithRaisedEventAsync( + orchestrationName, + orchestrationVersion, + instanceId, + null, + null, eventName, eventData); + } + + async Task InternalCreateOrchestrationInstanceWithRaisedEventAsync( + string orchestrationName, + string orchestrationVersion, + string orchestrationInstanceId, + object orchestrationInput, + IDictionary orchestrationTags, + string eventName, + object eventData) + { + if (string.IsNullOrWhiteSpace(orchestrationInstanceId)) { - instanceId = Guid.NewGuid().ToString("N"); + orchestrationInstanceId = Guid.NewGuid().ToString("N"); } var orchestrationInstance = new OrchestrationInstance { - InstanceId = instanceId, + InstanceId = orchestrationInstanceId, ExecutionId = Guid.NewGuid().ToString("N"), }; - var startedEvent = new ExecutionStartedEvent(-1, null) + string serializedOrchestrationData = defaultConverter.Serialize(orchestrationInput); + var startedEvent = new ExecutionStartedEvent(-1, serializedOrchestrationData) { - Tags = null, + Tags = orchestrationTags, Name = orchestrationName, Version = orchestrationVersion, OrchestrationInstance = orchestrationInstance }; - string serializedInput = defaultConverter.Serialize(eventData); - - var taskMessages = new[] + var taskMessages = new List { new TaskMessage { OrchestrationInstance = orchestrationInstance, Event = startedEvent - }, - new TaskMessage + } + }; + + if (!string.IsNullOrEmpty(eventName)) + { + string serializedEventData = defaultConverter.Serialize(eventData); + taskMessages.Add(new TaskMessage { OrchestrationInstance = new OrchestrationInstance { - InstanceId = instanceId, + InstanceId = orchestrationInstanceId, // to ensure that the event gets raised on the running - // orchestration instance, null the execution id - // so that it will find out which execution - // it should use for processing - ExecutionId = null + // orchestration instance, null the execution id + // so that it will find out which execution + // it should use for processing + ExecutionId = null }, - Event = new EventRaisedEvent(-1, serializedInput) {Name = eventName} - } - }; + Event = new EventRaisedEvent(-1, serializedEventData) {Name = eventName} + }); + } - await this.serviceClient.SendTaskOrchestrationMessageAsync(taskMessages); + await this.serviceClient.SendTaskOrchestrationMessageAsync(taskMessages.ToArray()); + return orchestrationInstance; } /// From 979fbf7bcaac091b9cccb27b1b6a35dfce375df8 Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Fri, 2 Dec 2016 11:29:23 -0800 Subject: [PATCH 6/8] Not validating event name when raising event as it can be null, but relying on presence of event data --- src/DurableTask.Framework/TaskHubClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index a3300041a..9268cda28 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -354,7 +354,7 @@ async Task InternalCreateOrchestrationInstanceWithRaisedE } }; - if (!string.IsNullOrEmpty(eventName)) + if (eventData != null) { string serializedEventData = defaultConverter.Serialize(eventData); taskMessages.Add(new TaskMessage From 2a6687620f6f5ec1b1547fce1c838a353d663dc5 Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Fri, 2 Dec 2016 16:03:41 -0800 Subject: [PATCH 7/8] Creating a batch method for sending orchestration messages --- .../IOrchestrationServiceClient.cs | 11 +++++++++-- .../ServiceBusOrchestrationService.cs | 13 +++++++++++-- src/DurableTask.Framework/TaskHubClient.cs | 2 +- .../Mocks/LocalOrchestrationService.cs | 7 ++++++- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.Framework/IOrchestrationServiceClient.cs b/src/DurableTask.Framework/IOrchestrationServiceClient.cs index e04da3012..ea4df4517 100644 --- a/src/DurableTask.Framework/IOrchestrationServiceClient.cs +++ b/src/DurableTask.Framework/IOrchestrationServiceClient.cs @@ -33,9 +33,16 @@ public interface IOrchestrationServiceClient /// /// Send a new message for an orchestration /// - /// List of messages to send + /// Message to send /// - Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages); + Task SendTaskOrchestrationMessageAsync(TaskMessage message); + + /// + /// Send a new set of messages for an orchestration + /// + /// Messages to send + /// + Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages); /// /// Wait for an orchestration to reach any terminal state within the given timeout diff --git a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs index e746b78a7..90c0c359a 100644 --- a/src/DurableTask.Framework/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.Framework/ServiceBusOrchestrationService.cs @@ -971,11 +971,20 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage) await this.InstanceStore.WriteJumpStartEntitesAsync(new[] { jumpStartEntity }); } + /// + /// Sends an orchestration message + /// + /// The task message to be sent for the orchestration + public Task SendTaskOrchestrationMessageAsync(TaskMessage message) + { + return SendTaskOrchestrationMessageBatchAsync(message); + } + /// /// Sends a set of orchestration messages /// - /// The task message to be sent for the orchestration - public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages) + /// The task messages to be sent for the orchestration + public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages) { if (messages.Length == 0) { diff --git a/src/DurableTask.Framework/TaskHubClient.cs b/src/DurableTask.Framework/TaskHubClient.cs index 9268cda28..97b51d765 100644 --- a/src/DurableTask.Framework/TaskHubClient.cs +++ b/src/DurableTask.Framework/TaskHubClient.cs @@ -373,7 +373,7 @@ async Task InternalCreateOrchestrationInstanceWithRaisedE }); } - await this.serviceClient.SendTaskOrchestrationMessageAsync(taskMessages.ToArray()); + await this.serviceClient.SendTaskOrchestrationMessageBatchAsync(taskMessages.ToArray()); return orchestrationInstance; } diff --git a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs index 10b5b4237..f0f0aca16 100644 --- a/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs +++ b/test/DurableTask.Framework.Tests/Mocks/LocalOrchestrationService.cs @@ -189,7 +189,12 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage) return Task.FromResult(null); } - public Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages) + public Task SendTaskOrchestrationMessageAsync(TaskMessage message) + { + return SendTaskOrchestrationMessageBatchAsync(message); + } + + public Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages) { foreach (var message in messages) { From db8c73123c9fbafd3c5102fe76af12316802c56d Mon Sep 17 00:00:00 2001 From: Matt Hawley Date: Fri, 2 Dec 2016 16:10:15 -0800 Subject: [PATCH 8/8] Treating warnings as errors --- tools/DurableTask.props | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/DurableTask.props b/tools/DurableTask.props index 6e03921ea..99bb25064 100644 --- a/tools/DurableTask.props +++ b/tools/DurableTask.props @@ -6,6 +6,7 @@ False False True + True