Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding RaiseEvent to client for scalability, Allow creating queue with specific size #70

Merged
merged 8 commits into from
Dec 3, 2016
4 changes: 2 additions & 2 deletions src/DurableTask.Framework/IOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public interface IOrchestrationServiceClient
/// <summary>
/// Send a new message for an orchestration
/// </summary>
/// <param name="message">Message to send</param>
/// <param name="messages">List of messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);
Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages);

/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
Expand Down
83 changes: 54 additions & 29 deletions src/DurableTask.Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ public async Task CreateAsync(bool recreateInstanceStore)
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
this.SafeDeleteAndCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
this.SafeDeleteAndCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);

if (InstanceStore != null)
{
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await this.SafeDeleteAndCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(recreateInstanceStore);
}
}
Expand All @@ -241,12 +241,12 @@ public async Task CreateIfNotExistsAsync()
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

await Task.WhenAll(
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount)
SafeCreateQueueAsync(namespaceManager, orchestratorEntityName, true, true, Settings.MaxTaskOrchestrationDeliveryCount, Settings.MaxQueueSizeInMegabytes),
SafeCreateQueueAsync(namespaceManager, workerEntityName, false, false, Settings.MaxTaskActivityDeliveryCount, Settings.MaxQueueSizeInMegabytes)
);
if (InstanceStore != null)
{
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, trackingEntityName, true, false, Settings.MaxTrackingDeliveryCount, Settings.MaxQueueSizeInMegabytes);
await InstanceStore.InitializeStoreAsync(false);
}
}
Expand Down Expand Up @@ -902,7 +902,10 @@ public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
var message = GetAndDeleteBrokeredMessageForWorkItem(workItem);
TraceHelper.Trace(TraceEventType.Information, $"Abandoning message {workItem?.Id}");
return message?.AbandonAsync();

return message == null
? Task.FromResult<object>(null)
: message.AbandonAsync();
}

/// <summary>
Expand Down Expand Up @@ -969,29 +972,40 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage)
}

/// <summary>
/// Send an orchestration message
/// Sends a set of orchestration messages
/// </summary>
/// <param name="message">The task message to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
/// <param name="messages">The task message to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages)
{
BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
message,
Settings.MessageCompressionSettings,
Settings.MessageSettings,
message.OrchestrationInstance,
"SendTaskOrchestrationMessage",
this.BlobStore,
DateTime.MinValue);
if (messages.Length == 0)
{
return;
}

// Use duplicate detection of ExecutionStartedEvent by addin messageId
var executionStartedEvent = message.Event as ExecutionStartedEvent;
if (executionStartedEvent != null)
var brokeredMessages = new BrokeredMessage[messages.Length];
for (int i = 0; i < messages.Length; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetBrokeredMessageFromObjectAsync could end up going to blob storage or another slow external storage so running this as tasks would be better so the latency is not sequential.

{
brokeredMessage.MessageId = string.Format(CultureInfo.InvariantCulture, $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}");
var message = messages[i];

brokeredMessages[i] = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
message,
Settings.MessageCompressionSettings,
Settings.MessageSettings,
message.OrchestrationInstance,
"SendTaskOrchestrationMessage",
this.BlobStore,
DateTime.MinValue);

// Use duplicate detection of ExecutionStartedEvent by addin messageId
var executionStartedEvent = message.Event as ExecutionStartedEvent;
if (executionStartedEvent != null)
{
brokeredMessages[i].MessageId = $"{executionStartedEvent.OrchestrationInstance.InstanceId}_{executionStartedEvent.OrchestrationInstance.ExecutionId}";
}
}

MessageSender sender = await messagingFactory.CreateMessageSenderAsync(orchestratorEntityName).ConfigureAwait(false);
await sender.SendAsync(brokeredMessage).ConfigureAwait(false);
await sender.SendBatchAsync(brokeredMessages).ConfigureAwait(false);
await sender.CloseAsync().ConfigureAwait(false);
}

Expand Down Expand Up @@ -1471,13 +1485,14 @@ async Task SafeCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await Utils.ExecuteWithRetries(async () =>
{
try
{
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await CreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}
catch (MessagingEntityAlreadyExistsException)
{
Expand All @@ -1491,25 +1506,35 @@ async Task SafeDeleteAndCreateQueueAsync(
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
await SafeDeleteQueueAsync(namespaceManager, path);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount);
await SafeCreateQueueAsync(namespaceManager, path, requiresSessions, requiresDuplicateDetection, maxDeliveryCount, maxSizeInMegabytes);
}

private static readonly long[] ValidQueueSizes = { 1024L, 2048L, 3072L, 4096L, 5120L };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convention is to not include modifier for private


async Task CreateQueueAsync(
NamespaceManager namespaceManager,
string path,
bool requiresSessions,
bool requiresDuplicateDetection,
int maxDeliveryCount)
int maxDeliveryCount,
long maxSizeInMegabytes)
{
if (!ValidQueueSizes.Contains(maxSizeInMegabytes))
{
throw new ArgumentException($"The specified value {maxSizeInMegabytes} is invalid for the maximum queue size in megabytes.\r\nIt must be one of the following values:\r\n{string.Join(";", ValidQueueSizes)}", nameof(maxSizeInMegabytes));
}

var description = new QueueDescription(path)
{
RequiresSession = requiresSessions,
MaxDeliveryCount = maxDeliveryCount,
RequiresDuplicateDetection = requiresDuplicateDetection,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours)
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromHours(DuplicateDetectionWindowInHours),
MaxSizeInMegabytes = maxSizeInMegabytes
};

await namespaceManager.CreateQueueAsync(description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public ServiceBusOrchestrationServiceSettings()
/// </summary>
public int MaxTrackingDeliveryCount { get; set; }

/// <summary>
/// Maximum queue size, in megabytes, for the service bus queues
/// </summary>
public long MaxQueueSizeInMegabytes { get; set; } = 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024L as default for consistency with the valid sizes list


/// <summary>
/// Gets the message prefetch count
/// </summary>
Expand Down
63 changes: 63 additions & 0 deletions src/DurableTask.Framework/TaskHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,69 @@ public async Task<OrchestrationInstance> CreateOrchestrationInstanceAsync(
return orchestrationInstance;
}

/// <summary>
/// Creates an orchestration instance, and raises an event for it, which eventually causes the OnEvent() method in the
/// orchestration to fire.
/// </summary>
/// <param name="orchestrationName">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="orchestrationVersion">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="instanceId">Instance id for the orchestration to be created, must be unique across the Task Hub</param>
/// <param name="eventName">Name of the event</param>
/// <param name="eventData">Data for the event</param>
public async Task RaiseEventAsync(
Copy link
Contributor

@simonporter simonporter Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming of RaiseEventAsync here is a bit confusing as the meaning for an already running instance is quite different.
I think it would be better to have a CreateOrchestrationInstanceWithRaisedEventAsync (open to other naming suggestions) and have the various permutations for with/without tags etc.
Likely best to have an InternalCreateOrchestrationInstanceAsync that this and the CreateOrchestrationInstanceAsync can call to avoid duplication.

string orchestrationName,
string orchestrationVersion,
string instanceId,
string eventName,
object eventData)
{
if (string.IsNullOrWhiteSpace(instanceId))
{
instanceId = Guid.NewGuid().ToString("N");
}

var orchestrationInstance = new OrchestrationInstance
{
InstanceId = instanceId,
ExecutionId = Guid.NewGuid().ToString("N"),
};

var startedEvent = new ExecutionStartedEvent(-1, null)
{
Tags = null,
Name = orchestrationName,
Version = orchestrationVersion,
OrchestrationInstance = orchestrationInstance
};

string serializedInput = defaultConverter.Serialize(eventData);

var taskMessages = new[]
{
new TaskMessage
{
OrchestrationInstance = orchestrationInstance,
Event = startedEvent
},
new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = instanceId,

// to ensure that the event gets raised on the running
// orchestration instance, null the execution id
// so that it will find out which execution
// it should use for processing
ExecutionId = null
},
Event = new EventRaisedEvent(-1, serializedInput) {Name = eventName}
}
};

await this.serviceClient.SendTaskOrchestrationMessageAsync(taskMessages);
}

/// <summary>
/// Raises an event in the specified orchestration instance, which eventually causes the OnEvent() method in the
/// orchestration to fire.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
return Task.FromResult<object>(null);
}

public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
public Task SendTaskOrchestrationMessageAsync(params TaskMessage[] messages)
{
this.orchestratorQueue.SendMessage(message);
foreach (var message in messages)
{
this.orchestratorQueue.SendMessage(message);
}

return Task.FromResult<object>(null);
}

Expand Down