Skip to content

Commit

Permalink
Adding RaiseEvent to client for scalability, Allow creating queue wit…
Browse files Browse the repository at this point in the history
…h specific size (#70)

* Adding a method in the client that will both create an orchestration instance and send raise the event in a batch

* Ensuring an instance id is set, not sending tasks if no messages

* Adding ability to set the size of service bus queues at creation time

* Fixing exception when awaiting on a null Task

* Resolving pull request comments.

* Not validating event name when raising event as it can be null, but relying on presence of event data

* Creating a batch method for sending orchestration messages

* Treating warnings as errors
  • Loading branch information
matthawley authored and simonporter committed Dec 3, 2016
1 parent d817c90 commit 3b903f2
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 43 deletions.
8 changes: 8 additions & 0 deletions samples/DurableTask.Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ static void Main(string[] args)
case "Signal":
instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(SignalOrchestration), instanceId, null).Result;
break;
case "SignalAndRaise":
if (options.Parameters == null || options.Parameters.Length != 1)
{
throw new ArgumentException("parameters");
}

instance = taskHubClient.CreateOrchestrationInstanceWithRaisedEventAsync(typeof(SignalOrchestration), instanceId, null, options.Signal, options.Parameters[0]).Result;
break;
case "Replat":
instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId,
new MigrateOrchestrationData() { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result;
Expand Down
2 changes: 1 addition & 1 deletion samples/DurableTask.Samples/Signal/SignalOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SignalOrchestration : TaskOrchestration<string,string>
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
string user = await WaitForSignal();
string greeting = await context.ScheduleTask<string>("DurableTaskSamples.Greetings.SendGreetingTask", string.Empty, user);
string greeting = await context.ScheduleTask<string>("DurableTask.Samples.Greetings.SendGreetingTask", string.Empty, user);
return greeting;
}

Expand Down
7 changes: 7 additions & 0 deletions src/DurableTask.Framework/IOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface IOrchestrationServiceClient
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);

/// <summary>
/// Send a new set of messages for an orchestration
/// </summary>
/// <param name="messages">Messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);

/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
Expand Down
81 changes: 61 additions & 20 deletions src/DurableTask.Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ public async Task CreateAsync(bool recreateInstanceStore)
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);

if (InstanceStore != null)
{
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(recreateInstanceStore);
}
}
Expand All @@ -241,12 +241,12 @@ public async Task CreateIfNotExistsAsync()
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);
if (InstanceStore != null)
{
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(false);
}
}
Expand Down Expand Up @@ -902,7 +902,10 @@ public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
var message = GetAndDeleteBrokeredMessageForWorkItem(workItem);
TraceHelper.Trace(TraceEventType.Information, $"Abandoning message {workItem?.Id}");
return message?.AbandonAsync();

return message == null
? Task.FromResult<object>(null)
: message.AbandonAsync();
}

/// <summary>
Expand Down Expand Up @@ -969,12 +972,41 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage)
}

/// <summary>
/// Send an orchestration message
/// Sends an orchestration message
/// </summary>
/// <param name="message">The task message to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
{
BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
return SendTaskOrchestrationMessageBatchAsync(message);
}

/// <summary>
/// Sends a set of orchestration messages
/// </summary>
/// <param name="messages">The task messages to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
{
if (messages.Length == 0)
{
return;
}

var tasks = new Task<BrokeredMessage>[messages.Length];
for (int i = 0; i < messages.Length; i++)
{
tasks[i] = GetBrokeredMessageAsync(messages[i]);
}

var brokeredMessages = await Task.WhenAll(tasks);

MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false);
await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false);
await sender.CloseAsync().ConfigureAwait(false);
}

async Task<BrokeredMessage> GetBrokeredMessageAsync(TaskMessage message)
{
var brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
message,
Settings.MessageCompressionSettings,
Settings.MessageSettings,
Expand All @@ -987,12 +1019,10 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
var executionStartedEvent = message.Event as ExecutionStartedEvent;
if (executionStartedEvent != null)
{
brokeredMessage.MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}");
brokeredMessage.MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}";
}

MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false);
await sender.SendAsync(brokeredMessage).ConfigureAwait(false);
await sender.CloseAsync().ConfigureAwait(false);
return brokeredMessage;
}

/// <summary>
Expand Down Expand Up @@ -1471,13 +1501,14 @@ async Task SafeCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await Utils.ExecuteWithRetries(async () =>
{
try
{
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}
catch (MessagingEntityAlreadyExistsException)
{
Expand All @@ -1491,25 +1522,35 @@ async Task SafeDeleteAndCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await SafeDeleteQueueAsync(namespaceManager, path);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}

static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L };

async Task CreateQueueAsync(
NamespaceManager namespaceManager,
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
if (!ValidQueueSizes.Contains(maxSizeInMegabytes))
{
throw new ArgumentException($"The specified value {maxSizeInMegabytes} is invalid for the maximum queue size in megabytes.\r\nIt must be one of the following values:\r\n{string.Join(";", ValidQueueSizes)}", nameof(maxSizeInMegabytes));
}

var description = new QueueDescription(path)
{
RequiresSession = requiresSessions,
MaxDeliveryCount = maxDeliveryCount,
RequiresDuplicateDetection = requiresDuplicateDetection,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours)
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours),
MaxSizeInMegabytes = maxSizeInMegabytes
};

await namespaceManager.CreateQueueAsync(description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public ServiceBusOrchestrationServiceSettings()
/// </summary>
public int MaxTrackingDeliveryCount { get; set; }

/// <summary>
/// Maximum queue size, in megabytes, for the service bus queues
/// </summary>
public long MaxQueueSizeInMegabytes { get; set; } = 1024L;

/// <summary>
/// Gets the message prefetch count
/// </summary>
Expand Down
Loading

0 comments on commit 3b903f2

Please sign in to comment.