Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding RaiseEvent to client for scalability, Allow creating queue with specific size #70

Merged
merged 8 commits into from
Dec 3, 2016
8 changes: 8 additions & 0 deletions samples/DurableTask.Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ static void Main(string[] args)
case "Signal":
instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(SignalOrchestration), instanceId, null).Result;
break;
case "SignalAndRaise":
if (options.Parameters == null || options.Parameters.Length != 1)
{
throw new ArgumentException("parameters");
}

instance = taskHubClient.CreateOrchestrationInstanceWithRaisedEventAsync(typeof(SignalOrchestration), instanceId, null, options.Signal, options.Parameters[0]).Result;
break;
case "Replat":
instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId,
new MigrateOrchestrationData() { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result;
Expand Down
2 changes: 1 addition & 1 deletion samples/DurableTask.Samples/Signal/SignalOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SignalOrchestration : TaskOrchestration<string,string>
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
string user = await WaitForSignal();
string greeting = await context.ScheduleTask<string>("DurableTaskSamples.Greetings.SendGreetingTask", string.Empty, user);
string greeting = await context.ScheduleTask<string>("DurableTask.Samples.Greetings.SendGreetingTask", string.Empty, user);
return greeting;
}

Expand Down
7 changes: 7 additions & 0 deletions src/DurableTask.Framework/IOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface IOrchestrationServiceClient
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);

/// <summary>
/// Send a new set of messages for an orchestration
/// </summary>
/// <param name="messages">Messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);

/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
Expand Down
81 changes: 61 additions & 20 deletions src/DurableTask.Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ public async Task CreateAsync(bool recreateInstanceStore)
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);

if (InstanceStore != null)
{
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(recreateInstanceStore);
}
}
Expand All @@ -241,12 +241,12 @@ public async Task CreateIfNotExistsAsync()
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);
if (InstanceStore != null)
{
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(false);
}
}
Expand Down Expand Up @@ -902,7 +902,10 @@ public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
var message = GetAndDeleteBrokeredMessageForWorkItem(workItem);
TraceHelper.Trace(TraceEventType.Information, $"Abandoning message {workItem?.Id}");
return message?.AbandonAsync();

return message == null
? Task.FromResult<object>(null)
: message.AbandonAsync();
}

/// <summary>
Expand Down Expand Up @@ -969,12 +972,41 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage)
}

/// <summary>
/// Send an orchestration message
/// Sends an orchestration message
/// </summary>
/// <param name="message">The task message to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
{
BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
return SendTaskOrchestrationMessageBatchAsync(message);
}

/// <summary>
/// Sends a set of orchestration messages
/// </summary>
/// <param name="messages">The task messages to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
{
if (messages.Length == 0)
{
return;
}

var tasks = new Task<BrokeredMessage>[messages.Length];
for (int i = 0; i < messages.Length; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetBrokeredMessageFromObjectAsync could end up going to blob storage or another slow external storage so running this as tasks would be better so the latency is not sequential.

{
tasks[i] = GetBrokeredMessageAsync(messages[i]);
}

var brokeredMessages = await Task.WhenAll(tasks);

MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false);
await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false);
await sender.CloseAsync().ConfigureAwait(false);
}

async Task<BrokeredMessage> GetBrokeredMessageAsync(TaskMessage message)
{
var brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
message,
Settings.MessageCompressionSettings,
Settings.MessageSettings,
Expand All @@ -987,12 +1019,10 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
var executionStartedEvent = message.Event as ExecutionStartedEvent;
if (executionStartedEvent != null)
{
brokeredMessage.MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}");
brokeredMessage.MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}";
}

MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false);
await sender.SendAsync(brokeredMessage).ConfigureAwait(false);
await sender.CloseAsync().ConfigureAwait(false);
return brokeredMessage;
}

/// <summary>
Expand Down Expand Up @@ -1471,13 +1501,14 @@ async Task SafeCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await Utils.ExecuteWithRetries(async () =>
{
try
{
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}
catch (MessagingEntityAlreadyExistsException)
{
Expand All @@ -1491,25 +1522,35 @@ async Task SafeDeleteAndCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await SafeDeleteQueueAsync(namespaceManager, path);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}

static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L };

async Task CreateQueueAsync(
NamespaceManager namespaceManager,
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
if (!ValidQueueSizes.Contains(maxSizeInMegabytes))
{
throw new ArgumentException($"The specified value {maxSizeInMegabytes} is invalid for the maximum queue size in megabytes.\r\nIt must be one of the following values:\r\n{string.Join(";", ValidQueueSizes)}", nameof(maxSizeInMegabytes));
}

var description = new QueueDescription(path)
{
RequiresSession = requiresSessions,
MaxDeliveryCount = maxDeliveryCount,
RequiresDuplicateDetection = requiresDuplicateDetection,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours)
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours),
MaxSizeInMegabytes = maxSizeInMegabytes
};

await namespaceManager.CreateQueueAsync(description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public ServiceBusOrchestrationServiceSettings()
/// </summary>
public int MaxTrackingDeliveryCount { get; set; }

/// <summary>
/// Maximum queue size, in megabytes, for the service bus queues
/// </summary>
public long MaxQueueSizeInMegabytes { get; set; } = 1024L;

/// <summary>
/// Gets the message prefetch count
/// </summary>
Expand Down
Loading