diff --git a/Framework/Common/ServiceBusUtils.cs b/Framework/Common/ServiceBusUtils.cs index 71fa7bf74..16e31017a 100644 --- a/Framework/Common/ServiceBusUtils.cs +++ b/Framework/Common/ServiceBusUtils.cs @@ -20,21 +20,25 @@ namespace DurableTask.Common using System.Threading.Tasks; using DurableTask.Settings; using DurableTask.Tracing; + using DurableTask.Tracking; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; internal static class ServiceBusUtils { - public static BrokeredMessage GetBrokeredMessageFromObject(object serializableObject, CompressionSettings compressionSettings) + public static Task GetBrokeredMessageFromObjectAsync(object serializableObject, CompressionSettings compressionSettings) { - return GetBrokeredMessageFromObject(serializableObject, compressionSettings, null, null); + return GetBrokeredMessageFromObjectAsync(serializableObject, compressionSettings, new ServiceBusMessageSettings(), null, null, null, DateTime.MinValue); } - public static BrokeredMessage GetBrokeredMessageFromObject( + public static async Task GetBrokeredMessageFromObjectAsync( object serializableObject, CompressionSettings compressionSettings, + ServiceBusMessageSettings messageSettings, OrchestrationInstance instance, - string messageType) + string messageType, + IOrchestrationServiceBlobStore orchestrationServiceBlobStore, + DateTime messageFireTime) { if (serializableObject == null) { @@ -46,6 +50,11 @@ public static BrokeredMessage GetBrokeredMessageFromObject( return new BrokeredMessage(serializableObject) { SessionId = instance?.InstanceId }; } + if (messageSettings == null) + { + messageSettings = new ServiceBusMessageSettings(); + } + bool disposeStream = true; var rawStream = new MemoryStream(); @@ -60,23 +69,33 @@ public static BrokeredMessage GetBrokeredMessageFromObject( rawStream.Length > compressionSettings.ThresholdInBytes)) { Stream compressedStream = Utils.GetCompressedStream(rawStream); - - brokeredMessage = new BrokeredMessage(compressedStream, true); - brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = - FrameworkConstants.CompressionTypeGzipPropertyValue; - var rawLen = rawStream.Length; TraceHelper.TraceInstance(TraceEventType.Information, instance, () => - "Compression stats for " + (messageType ?? string.Empty) + " : " + brokeredMessage.MessageId + + "Compression stats for " + (messageType ?? string.Empty) + " : " + + brokeredMessage.MessageId + ", uncompressed " + rawLen + " -> compressed " + compressedStream.Length); + + if (compressedStream.Length < messageSettings.MessageOverflowThresholdInBytes) + { + brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(compressedStream, FrameworkConstants.CompressionTypeGzipPropertyValue); + } + else + { + brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(compressedStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeGzipPropertyValue); + } } else { - brokeredMessage = new BrokeredMessage(rawStream, true); - disposeStream = false; - brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = - FrameworkConstants.CompressionTypeNonePropertyValue; + if (rawStream.Length < messageSettings.MessageOverflowThresholdInBytes) + { + brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(rawStream, FrameworkConstants.CompressionTypeNonePropertyValue); + disposeStream = false; + } + else + { + brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(rawStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeNonePropertyValue); + } } brokeredMessage.SessionId = instance?.InstanceId; @@ -94,7 +113,55 @@ public static BrokeredMessage GetBrokeredMessageFromObject( } } - public static async Task GetObjectFromBrokeredMessageAsync(BrokeredMessage message) + static BrokeredMessage GenerateBrokeredMessageWithCompressionTypeProperty(Stream stream, string compressionType) + { + BrokeredMessage brokeredMessage = new BrokeredMessage(stream, true); + brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = compressionType; + + return brokeredMessage; + } + + static async Task GenerateBrokeredMessageWithBlobKeyPropertyAsync( + Stream stream, + IOrchestrationServiceBlobStore orchestrationServiceBlobStore, + OrchestrationInstance instance, + ServiceBusMessageSettings messageSettings, + DateTime messageFireTime, + string compressionType) + { + if (stream.Length > messageSettings.MessageMaxSizeInBytes) + { + throw new ArgumentException( + $"The serialized message size {stream.Length} is larger than the supported external storage blob size {messageSettings.MessageMaxSizeInBytes}.", + "stream"); + } + + if (orchestrationServiceBlobStore == null) + { + throw new ArgumentException( + "Please provide an implementation of IOrchestrationServiceBlobStore for external storage.", + "orchestrationServiceBlobStore"); + } + + // save the compressed stream using external storage when it is larger + // than the supported message size limit. + // the stream is stored using the generated key, which is saved in the message property. + string blobKey = orchestrationServiceBlobStore.BuildMessageBlobKey(instance, messageFireTime); + + TraceHelper.TraceInstance( + TraceEventType.Information, + instance, + () => $"Saving the message stream in blob storage using key {blobKey}."); + await orchestrationServiceBlobStore.SaveStreamAsync(blobKey, stream); + + BrokeredMessage brokeredMessage = new BrokeredMessage(); + brokeredMessage.Properties[FrameworkConstants.MessageBlobKey] = blobKey; + brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = compressionType; + + return brokeredMessage; + } + + public static async Task GetObjectFromBrokeredMessageAsync(BrokeredMessage message, IOrchestrationServiceBlobStore orchestrationServiceBlobStore) { if (message == null) { @@ -119,7 +186,7 @@ public static async Task GetObjectFromBrokeredMessageAsync(BrokeredMessage else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeGzipPropertyValue, StringComparison.OrdinalIgnoreCase)) { - using (var compressedStream = message.GetBody()) + using (var compressedStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore)) { if (!Utils.IsGzipStream(compressedStream)) { @@ -137,7 +204,7 @@ public static async Task GetObjectFromBrokeredMessageAsync(BrokeredMessage else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeNonePropertyValue, StringComparison.OrdinalIgnoreCase)) { - using (var rawStream = message.GetBody()) + using (var rawStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore)) { deserializedObject = Utils.ReadObjectFromStream(rawStream); } @@ -152,6 +219,33 @@ public static async Task GetObjectFromBrokeredMessageAsync(BrokeredMessage return deserializedObject; } + static Task LoadMessageStreamAsync(BrokeredMessage message, IOrchestrationServiceBlobStore orchestrationServiceBlobStore) + { + object blobKeyObj = null; + string blobKey = string.Empty; + + if (message.Properties.TryGetValue(FrameworkConstants.MessageBlobKey, out blobKeyObj)) + { + blobKey = (string)blobKeyObj; + } + + if (string.IsNullOrEmpty(blobKey)) + { + // load the stream from the message directly if the blob key property is not set, + // i.e., it is not stored externally + return Task.Run(() => message.GetBody()); + } + + // if the blob key is set in the message property, + // load the stream message from the service bus message store. + if (orchestrationServiceBlobStore == null) + { + throw new ArgumentException($"Failed to load compressed message from external storage with key: {blobKey}. Please provide an implementation of IServiceBusMessageStore for external storage.", nameof(orchestrationServiceBlobStore)); + } + + return orchestrationServiceBlobStore.LoadStreamAsync(blobKey); + } + public static void CheckAndLogDeliveryCount(string sessionId, IEnumerable messages, int maxDeliverycount) { foreach (BrokeredMessage message in messages) diff --git a/Framework/DurableTaskFramework.csproj b/Framework/DurableTaskFramework.csproj index d96903fcd..d59186820 100644 --- a/Framework/DurableTaskFramework.csproj +++ b/Framework/DurableTaskFramework.csproj @@ -105,14 +105,22 @@ + + + + + + + + diff --git a/Framework/FrameworkConstants.cs b/Framework/FrameworkConstants.cs index c73ce94aa..884c41f9a 100644 --- a/Framework/FrameworkConstants.cs +++ b/Framework/FrameworkConstants.cs @@ -60,7 +60,19 @@ internal class FrameworkConstants public const string CompressionTypeGzipPropertyValue = "gzip"; public const string CompressionTypeNonePropertyValue = "none"; + // message blob key in message property + // this property is a key to the message blob when it exceeds the message limit + public const string MessageBlobKey = "MessageBlobKey"; + // instance store constants - public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes + public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes + + // default settings for message size + public const int MessageOverflowThresholdInBytesDefault = 170 * 1024; + public const int MessageMaxSizeInBytesDefault = 10 * 1024 * 1024; + + // default settings for session size + public const int SessionOverflowThresholdInBytesDefault = 230 * 1024; + public const int SessionMaxSizeInBytesDefault = 10 * 1024 * 1024; } } \ No newline at end of file diff --git a/Framework/IOrchestrationServiceClient.cs b/Framework/IOrchestrationServiceClient.cs index 5ff5b2add..43027be0a 100644 --- a/Framework/IOrchestrationServiceClient.cs +++ b/Framework/IOrchestrationServiceClient.cs @@ -83,9 +83,10 @@ Task WaitForOrchestrationAsync( /// /// Purges orchestration instance state and history for orchestrations older than the specified threshold time. + /// Also purges the blob storage. /// /// Threshold date time in UTC /// What to compare the threshold date time against - Task PurgeOrchestrationInstanceHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType); + Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType); } } \ No newline at end of file diff --git a/Framework/OrchestrationSessionState.cs b/Framework/OrchestrationSessionState.cs new file mode 100644 index 000000000..742495c4d --- /dev/null +++ b/Framework/OrchestrationSessionState.cs @@ -0,0 +1,61 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask +{ + using System.Collections.Generic; + using DurableTask.History; + + /// + /// The object that represents the serialized session state. + /// It holds a list of history events (when blob key is empty), + /// or a key for external storage if the serialized stream is too large to fit into the the session state. + /// + internal class OrchestrationSessionState + { + /// + /// A constructor for deserialzation. + /// + public OrchestrationSessionState() + { + } + + /// + /// Wrap a list of history events into an OrchestrationSessionState instance, which will be later serialized as a stream saved in session state. + /// + /// /// A list of history events. + public OrchestrationSessionState(IList events) + { + this.Events = events; + } + + /// + /// Construct an OrchestrationSessionState instance with a blob key as the blob reference in the external blob storage. + /// + /// /// The blob key to access the blob + public OrchestrationSessionState(string blobKey) + { + this.BlobKey = blobKey; + } + + /// + /// List of all history events for runtime state + /// + public IList Events { get; set; } + + /// + /// The blob key for external storage. Could be null or empty if not externally stored. + /// + public string BlobKey { get; set; } + } +} diff --git a/Framework/Serializing/RuntimeStateStreamConverter.cs b/Framework/Serializing/RuntimeStateStreamConverter.cs new file mode 100644 index 000000000..51fd114b0 --- /dev/null +++ b/Framework/Serializing/RuntimeStateStreamConverter.cs @@ -0,0 +1,255 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Serializing +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Threading.Tasks; + using DurableTask.Common; + using DurableTask.Exceptions; + using DurableTask.History; + using DurableTask.Settings; + using DurableTask.Tracing; + using DurableTask.Tracking; + + /// + /// A converter that does conversion between the OrchestrationRuntimeState instance and a stream after serialization. + /// The stream is a serialized OrchestrationSessionState that will set as session state. + /// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent. + /// + class RuntimeStateStreamConverter + { + /// + /// Convert an OrchestrationRuntimeState instance to a serialized raw stream to be saved in session state. + /// + /// The new OrchestrationRuntimeState to be serialized + /// The current runtime state + /// A data converter for serialization and deserialization + /// True if should compress when serialization + /// The service bus session settings + /// A blob store for external blob storage + /// The session id + /// A serialized raw strem to be saved in session state + public static async Task OrchestrationRuntimeStateToRawStream( + OrchestrationRuntimeState newOrchestrationRuntimeState, + OrchestrationRuntimeState runtimeState, + DataConverter dataConverter, + bool shouldCompress, + ServiceBusSessionSettings serviceBusSessionSettings, + IOrchestrationServiceBlobStore orchestrationServiceBlobStore, + string sessionId) + { + OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(newOrchestrationRuntimeState.Events); + string serializedState = dataConverter.Serialize(orchestrationSessionState); + + long originalStreamSize = 0; + Stream compressedState = Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + + runtimeState.Size = originalStreamSize; + runtimeState.CompressedSize = compressedState.Length; + + if (runtimeState.CompressedSize > serviceBusSessionSettings.SessionMaxSizeInBytes) + { + throw new OrchestrationException($"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {serviceBusSessionSettings.SessionMaxSizeInBytes} bytes"); + } + + if (runtimeState.CompressedSize > serviceBusSessionSettings.SessionOverflowThresholdInBytes) + { + TraceHelper.TraceSession(TraceEventType.Information, + sessionId, + $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {serviceBusSessionSettings.SessionOverflowThresholdInBytes} bytes." + + $"Creating an OrchestrationSessionState instance with key for exteranl storage."); + return await CreateStreamForExternalStorageAsync(shouldCompress, orchestrationServiceBlobStore, sessionId, dataConverter, compressedState); + } + + return compressedState; + } + + async static Task CreateStreamForExternalStorageAsync( + bool shouldCompress, + IOrchestrationServiceBlobStore orchestrationServiceBlobStore, + string sessionId, + DataConverter dataConverter, + Stream compressedState) + { + if (orchestrationServiceBlobStore == null) + { + throw new OrchestrationException( + "The compressed session is larger than supported. " + + "Please provide an implementation of IOrchestrationServiceBlobStore for external storage."); + } + + // create a new orchestration session state with the external blob key + string key = orchestrationServiceBlobStore.BuildSessionBlobKey(sessionId); + TraceHelper.TraceSession( + TraceEventType.Information, + sessionId, + $"Saving the serialized stream in external storage with key {key}."); + + // save the compressedState stream externally as a blob + await orchestrationServiceBlobStore.SaveStreamAsync(key, compressedState); + + // create an OrchestrationSessionState instance to hold the blob key, + // and then serialize the instance as a sream for the session state + OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(key); + string serializedStateExternal = dataConverter.Serialize(orchestrationSessionState); + + long streamSize; + Stream compressedStateForSession = Utils.WriteStringToStream( + serializedStateExternal, + shouldCompress, + out streamSize); + return compressedStateForSession; + } + + /// + /// Convert a raw stream to an orchestration runtime state instance. + /// + /// The raw session stream to be deserialized + /// The session Id + /// A blob store for external blob storage + /// >A data converter for serialization and deserialization + /// + public static async Task RawStreamToRuntimeState(Stream rawSessionStream, string sessionId, IOrchestrationServiceBlobStore orchestrationServiceBlobStore, DataConverter dataConverter) + { + bool isEmptySession; + OrchestrationRuntimeState runtimeState; + Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream); + + isEmptySession = sessionStream == null; + long rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length; + long newSessionStateSize = isEmptySession ? 0 : sessionStream.Length; + + string blobKey; + runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter, out blobKey); + + if (string.IsNullOrWhiteSpace(blobKey)) + { + TraceHelper.TraceSession( + TraceEventType.Information, + sessionId, + $"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}"); + return runtimeState; + } + + if (orchestrationServiceBlobStore == null) + { + throw new OrchestrationException( + $"Please provide an implementation of IOrchestrationServiceBlobStore for external storage to load the runtime state."); + } + + TraceHelper.TraceSession( + TraceEventType.Information, + sessionId, + $"Loading the serialzied stream from external storage with blob key {blobKey}."); + + Stream externalStream = await orchestrationServiceBlobStore.LoadStreamAsync(blobKey); + return await RawStreamToRuntimeState(externalStream, sessionId, orchestrationServiceBlobStore, dataConverter); + } + + static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter, out string blobKey) + { + OrchestrationRuntimeState runtimeState; + blobKey = string.Empty; + if (stateStream == null) + { + TraceHelper.TraceSession( + TraceEventType.Information, + sessionId, + "No session state exists, creating new session state."); + runtimeState = new OrchestrationRuntimeState(); + } + else + { + if (stateStream.Position != 0) + { + throw TraceHelper.TraceExceptionSession( + TraceEventType.Error, + sessionId, + new ArgumentException("Stream is partially consumed")); + } + + string serializedState = null; + using (var reader = new StreamReader(stateStream)) + { + serializedState = reader.ReadToEnd(); + } + + runtimeState = DeserializeToRuntimeStateWithFallback(serializedState, dataConverter, sessionId, out blobKey); + } + + return runtimeState; + } + + /// + /// Deserialize the session state to construct an OrchestrationRuntimeState instance. + /// + /// The session state string could be one of these: + /// 1. a serialized IList of HistoryEvent (master branch implementation), or + /// 2. a serialized OrchestrationRuntimeState instance with the history event list (vnext branch implementation), or + /// 3. a serialized OrchestrationSessionState instance with the history event list or a blob key (latest implementation). + /// + /// So when doing the deserialization, it is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent, to cover all cases. + /// + /// + /// The serialized session state + /// A data converter for serialization and deserialization + /// The session Id + /// The blob key output. Will be set if the state is in external storage. + /// The converted orchestration runtime state. + static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string serializedState, DataConverter dataConverter, string sessionId, out string blobKey) + { + OrchestrationRuntimeState runtimeState = null; + blobKey = string.Empty; + try + { + var sessionState = + dataConverter.Deserialize(serializedState); + runtimeState = new OrchestrationRuntimeState(sessionState.Events); + blobKey = sessionState.BlobKey; + } + catch (Exception exception) + { + TraceHelper.TraceSession( + TraceEventType.Warning, + sessionId, + $"Failed to deserialize session state to OrchestrationSessionState object: {serializedState}. More info: {exception.StackTrace}"); + try + { + var restoredState = + dataConverter.Deserialize(serializedState); + // Create a new Object with just the events, we don't want the rest + runtimeState = new OrchestrationRuntimeState(restoredState.Events); + } + catch (Exception e) + { + TraceHelper.TraceSession( + TraceEventType.Warning, + sessionId, + $"Failed to deserialize session state to OrchestrationRuntimeState object: {serializedState}. More info: {e.StackTrace}"); + + var events = dataConverter.Deserialize>(serializedState); + runtimeState = new OrchestrationRuntimeState(events); + } + } + + return runtimeState; + } + } +} diff --git a/Framework/ServiceBusOrchestrationService.cs b/Framework/ServiceBusOrchestrationService.cs index 013a961d2..1bb7f7fb1 100644 --- a/Framework/ServiceBusOrchestrationService.cs +++ b/Framework/ServiceBusOrchestrationService.cs @@ -46,8 +46,7 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat // This also has an impact on prefetch count as PrefetchCount cannot be greater than this value // as every fetched message also creates a tracking message which counts towards this limit. const int MaxMessageCount = 80; - const int SessionStreamWarningSizeInBytes = 200 * 1024; - const int SessionStreamTerminationThresholdInBytes = 230 * 1024; + const int SessionStreamWarningSizeInBytes = 150 * 1024; const int StatusPollingIntervalInSeconds = 2; const int DuplicateDetectionWindowInHours = 4; @@ -61,6 +60,11 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat /// public readonly IOrchestrationServiceInstanceStore InstanceStore; + /// + /// Blob store for oversized messages and sessions + /// + public readonly IOrchestrationServiceBlobStore BlobStore; + /// /// Statistics for the orchestration service /// @@ -92,11 +96,13 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat /// Service Bus connection string /// Hubname to use with the connection string /// Instance store Provider, where state and history messages will be stored + /// Blob store Provider, where oversized messages and sessions will be stored /// Settings object for service and client public ServiceBusOrchestrationService( string connectionString, string hubName, IOrchestrationServiceInstanceStore instanceStore, + IOrchestrationServiceBlobStore blobStore, ServiceBusOrchestrationServiceSettings settings) { this.connectionString = connectionString; @@ -107,6 +113,7 @@ public ServiceBusOrchestrationService( orchestratorEntityName = string.Format(FrameworkConstants.OrchestratorEndpointFormat, this.hubName); trackingEntityName = string.Format(FrameworkConstants.TrackingEndpointFormat, this.hubName); this.Settings = settings ?? new ServiceBusOrchestrationServiceSettings(); + this.BlobStore = blobStore; if (instanceStore != null) { this.InstanceStore = instanceStore; @@ -272,6 +279,11 @@ await Task.WhenAll( await InstanceStore.DeleteStoreAsync(); } } + + if (BlobStore != null) + { + await BlobStore.DeleteStoreAsync(); + } } // Service Bus Utility methods @@ -430,9 +442,9 @@ public async Task LockNextTaskOrchestrationWorkItemAs ServiceBusUtils.CheckAndLogDeliveryCount(session.SessionId, newMessages, this.Settings.MaxTaskOrchestrationDeliveryCount); IList newTaskMessages = await Task.WhenAll( - newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(message))); + newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(message, this.BlobStore))); - OrchestrationRuntimeState runtimeState = await GetSessionState(session); + OrchestrationRuntimeState runtimeState = await GetSessionStateAsync(session, this.BlobStore); long maxSequenceNumber = newMessages .OrderByDescending(message => message.SequenceNumber) @@ -585,27 +597,30 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( Transaction.Current.TransactionInformation.LocalIdentifier }"); - if (await TrySetSessionState(workItem, newOrchestrationRuntimeState, runtimeState, session)) + if (await TrySetSessionStateAsync(workItem, newOrchestrationRuntimeState, runtimeState, session)) { - if (runtimeState.CompressedSize > SessionStreamWarningSizeInBytes) + if (runtimeState.CompressedSize > SessionStreamWarningSizeInBytes && runtimeState.CompressedSize < Settings.SessionSettings.SessionOverflowThresholdInBytes) { TraceHelper.TraceSession( TraceEventType.Error, workItem.InstanceId, - $"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {SessionStreamTerminationThresholdInBytes}B"); + $"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {Settings.SessionSettings.SessionOverflowThresholdInBytes}B"); } - // We need to .ToList() the IEnumerable otherwise GetBrokeredMessageFromObject gets called 5 times per message due to Service Bus doing multiple enumeration + // We need to .ToList() the IEnumerable otherwise GetBrokeredMessageFromObjectAsync gets called 5 times per message due to Service Bus doing multiple enumeration if (outboundMessages?.Count > 0) { await workerSender.SendBatchAsync( - outboundMessages.Select(m => - ServiceBusUtils.GetBrokeredMessageFromObject( + await Task.WhenAll(outboundMessages.Select(m => + ServiceBusUtils.GetBrokeredMessageFromObjectAsync( m, Settings.MessageCompressionSettings, + Settings.MessageSettings, null, - "Worker outbound message")) - .ToList() + "Worker outbound message", + this.BlobStore, + DateTime.MinValue)) + .ToList()) ); this.ServiceStats.ActivityDispatcherStats.MessageBatchesSent.Increment(); this.ServiceStats.ActivityDispatcherStats.MessagesSent.Increment(outboundMessages.Count); @@ -614,17 +629,21 @@ await workerSender.SendBatchAsync( if (timerMessages?.Count > 0) { await orchestratorQueueClient.SendBatchAsync( - timerMessages.Select(m => + await Task.WhenAll(timerMessages.Select(async m => { - BrokeredMessage message = ServiceBusUtils.GetBrokeredMessageFromObject( + DateTime messageFireTime = ((TimerFiredEvent)m.Event).FireAt; + BrokeredMessage message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( m, Settings.MessageCompressionSettings, + Settings.MessageSettings, newOrchestrationRuntimeState.OrchestrationInstance, - "Timer Message"); - message.ScheduledEnqueueTimeUtc = ((TimerFiredEvent)m.Event).FireAt; + "Timer Message", + this.BlobStore, + messageFireTime); + message.ScheduledEnqueueTimeUtc = messageFireTime; return message; }) - .ToList() + .ToList()) ); this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment(); this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(timerMessages.Count); @@ -633,13 +652,16 @@ await orchestratorQueueClient.SendBatchAsync( if (orchestratorMessages?.Count > 0) { await orchestratorQueueClient.SendBatchAsync( - orchestratorMessages.Select(m => - ServiceBusUtils.GetBrokeredMessageFromObject( + await Task.WhenAll(orchestratorMessages.Select(m => + ServiceBusUtils.GetBrokeredMessageFromObjectAsync( m, Settings.MessageCompressionSettings, + Settings.MessageSettings, m.OrchestrationInstance, - "Sub Orchestration")) - .ToList() + "Sub Orchestration", + this.BlobStore, + DateTime.MinValue)) + .ToList()) ); this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment(); this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(orchestratorMessages.Count); @@ -648,11 +670,14 @@ await orchestratorQueueClient.SendBatchAsync( if (continuedAsNewMessage != null) { await orchestratorQueueClient.SendAsync( - ServiceBusUtils.GetBrokeredMessageFromObject( + await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( continuedAsNewMessage, Settings.MessageCompressionSettings, + Settings.MessageSettings, newOrchestrationRuntimeState.OrchestrationInstance, - "Continue as new") + "Continue as new", + this.BlobStore, + DateTime.MinValue) ); this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment(); this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(); @@ -660,7 +685,7 @@ await orchestratorQueueClient.SendAsync( if (InstanceStore != null) { - List trackingMessages = CreateTrackingMessages(runtimeState, sessionState.SequenceNumber); + List trackingMessages = await CreateTrackingMessagesAsync(runtimeState, sessionState.SequenceNumber); TraceHelper.TraceInstance(TraceEventType.Information, runtimeState.OrchestrationInstance, "Created {0} tracking messages", trackingMessages.Count); @@ -764,7 +789,7 @@ public async Task LockNextTaskActivityWorkItem(TimeSpan re receivedMessage.SessionId, GetFormattedLog($"New message to process: {receivedMessage.MessageId} [{receivedMessage.SequenceNumber}]")); - TaskMessage taskMessage = await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(receivedMessage); + TaskMessage taskMessage = await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(receivedMessage, this.BlobStore); ServiceBusUtils.CheckAndLogDeliveryCount(receivedMessage, Settings.MaxTaskActivityDeliveryCount); @@ -831,11 +856,14 @@ public async Task RenewTaskActivityWorkItemLockAsync(TaskA /// The response message to send public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage) { - BrokeredMessage brokeredResponseMessage = ServiceBusUtils.GetBrokeredMessageFromObject( + BrokeredMessage brokeredResponseMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( responseMessage, Settings.MessageCompressionSettings, + Settings.MessageSettings, workItem.TaskMessage.OrchestrationInstance, - $"Response for {workItem.TaskMessage.OrchestrationInstance.InstanceId}"); + $"Response for {workItem.TaskMessage.OrchestrationInstance.InstanceId}", + this.BlobStore, + DateTime.MinValue); var originalMessage = GetAndDeleteBrokeredMessageForWorkItem(workItem); if (originalMessage == null) @@ -951,11 +979,14 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage) /// The task message to be sent for the orchestration public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) { - BrokeredMessage brokeredMessage = ServiceBusUtils.GetBrokeredMessageFromObject( + BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( message, Settings.MessageCompressionSettings, + Settings.MessageSettings, message.OrchestrationInstance, - "SendTaskOrchestrationMessage"); + "SendTaskOrchestrationMessage", + this.BlobStore, + DateTime.MinValue); // Use duplicate detection of ExecutionStartedEvent by addin messageId var executionStartedEvent = message.Event as ExecutionStartedEvent; @@ -1069,20 +1100,27 @@ public async Task GetOrchestrationHistoryAsync(string instanceId, string /// /// Purges orchestration instance state and history for orchestrations older than the specified threshold time. + /// Also purges the blob storage. /// /// Threshold date time in UTC /// What to compare the threshold date time against - public async Task PurgeOrchestrationInstanceHistoryAsync( + public async Task PurgeOrchestrationHistoryAsync( DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType) { - ThrowIfInstanceStoreNotConfigured(); - TraceHelper.Trace(TraceEventType.Information, $"Purging orchestration instances before: {thresholdDateTimeUtc}, Type: {timeRangeFilterType}"); - int purgedEvents = await InstanceStore.PurgeOrchestrationHistoryEventsAsync(thresholdDateTimeUtc, timeRangeFilterType); + if (this.BlobStore != null) + { + await this.BlobStore.PurgeExpiredBlobsAsync(thresholdDateTimeUtc); + TraceHelper.Trace(TraceEventType.Information, $"Blob storage is purged."); + } - TraceHelper.Trace(TraceEventType.Information, $"Purged {purgedEvents} orchestration histories"); + if (InstanceStore != null) + { + int purgedEvents = await InstanceStore.PurgeOrchestrationHistoryEventsAsync(thresholdDateTimeUtc, timeRangeFilterType); + TraceHelper.Trace(TraceEventType.Information, $"Purged {purgedEvents} orchestration histories"); + } } /// @@ -1120,7 +1158,7 @@ async Task FetchTrackingWorkItemAsync(TimeSpan receiveTimeout) ServiceBusUtils.CheckAndLogDeliveryCount(newMessages, Settings.MaxTrackingDeliveryCount); IList newTaskMessages = await Task.WhenAll( - newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(message))); + newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(message, this.BlobStore))); var lockTokens = newMessages.ToDictionary(m => m.LockToken, m => m); var sessionState = new ServiceBusOrchestrationSession @@ -1143,7 +1181,7 @@ async Task FetchTrackingWorkItemAsync(TimeSpan receiveTimeout) /// /// The orchestation runtime state /// Sequence number for the created tracking messages - List CreateTrackingMessages(OrchestrationRuntimeState runtimeState, long sequenceNumber) + async Task> CreateTrackingMessagesAsync(OrchestrationRuntimeState runtimeState, long sequenceNumber) { var trackingMessages = new List(); @@ -1168,11 +1206,14 @@ List CreateTrackingMessages(OrchestrationRuntimeState runtimeSt OrchestrationInstance = runtimeState.OrchestrationInstance }; - BrokeredMessage trackingMessage = ServiceBusUtils.GetBrokeredMessageFromObject( + BrokeredMessage trackingMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( taskMessage, Settings.MessageCompressionSettings, + Settings.MessageSettings, runtimeState.OrchestrationInstance, - "History Tracking Message"); + "History Tracking Message", + this.BlobStore, + DateTime.MinValue); trackingMessages.Add(trackingMessage); } } @@ -1184,11 +1225,14 @@ List CreateTrackingMessages(OrchestrationRuntimeState runtimeSt OrchestrationInstance = runtimeState.OrchestrationInstance }; - BrokeredMessage brokeredStateMessage = ServiceBusUtils.GetBrokeredMessageFromObject( + BrokeredMessage brokeredStateMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync( stateMessage, Settings.MessageCompressionSettings, + Settings.MessageSettings, runtimeState.OrchestrationInstance, - "State Tracking Message"); + "State Tracking Message", + BlobStore, + DateTime.MinValue); trackingMessages.Add(brokeredStateMessage); return trackingMessages; @@ -1318,31 +1362,17 @@ string GetFormattedLog(string input) return input; } - async Task GetSessionState(MessageSession session) + async Task GetSessionStateAsync(MessageSession session, IOrchestrationServiceBlobStore orchestrationServiceBlobStore) { - long rawSessionStateSize; - long newSessionStateSize; - OrchestrationRuntimeState runtimeState; - bool isEmptySession; - using (Stream rawSessionStream = await session.GetStateAsync()) - using (Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream)) { this.ServiceStats.OrchestrationDispatcherStats.SessionGets.Increment(); - isEmptySession = sessionStream == null; - rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length; - newSessionStateSize = isEmptySession ? 0 : sessionStream.Length; - - runtimeState = GetOrCreateInstanceState(sessionStream, session.SessionId); + return await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawSessionStream, session.SessionId, orchestrationServiceBlobStore, DataConverter); } - - TraceHelper.TraceSession(TraceEventType.Information, session.SessionId, - $"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}"); - - return runtimeState; + } - async Task TrySetSessionState( + async Task TrySetSessionStateAsync( TaskOrchestrationWorkItem workItem, OrchestrationRuntimeState newOrchestrationRuntimeState, OrchestrationRuntimeState runtimeState, @@ -1356,45 +1386,43 @@ async Task TrySetSessionState( return true; } - string serializedState = DataConverter.Serialize(newOrchestrationRuntimeState); - - long originalStreamSize = 0; - using ( - Stream compressedState = Utils.WriteStringToStream( - serializedState, - Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState, - out originalStreamSize)) + try { - runtimeState.Size = originalStreamSize; - runtimeState.CompressedSize = compressedState.Length; - if (runtimeState.CompressedSize > SessionStreamTerminationThresholdInBytes) - { - // basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient - // it is possible to have other messages in front of the queue and those will get processed before - // the terminate message gets processed. but that is ok since in the worst case scenario we will - // simply land in this if-block again and end up queuing up another terminate message. - // - // the interesting scenario is when the second time we *dont* land in this if-block because e.g. - // the new messages that we processed caused a new generation to be created. in that case - // it is still ok because the worst case scenario is that we will terminate a newly created generation - // which shouldn't have been created at all in the first place - - isSessionSizeThresholdExceeded = true; - - string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes"; - TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason); - - BrokeredMessage forcedTerminateMessage = CreateForcedTerminateMessage(runtimeState.OrchestrationInstance.InstanceId, reason); - - await orchestratorQueueClient.SendAsync(forcedTerminateMessage); - this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(); - this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment(); - } - else - { - await session.SetStateAsync(compressedState); - this.ServiceStats.OrchestrationDispatcherStats.SessionSets.Increment(); - } + Stream rawStream = await + RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeState, + runtimeState, + DataConverter, + Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState, + Settings.SessionSettings, + this.BlobStore, + session.SessionId); + + await session.SetStateAsync(rawStream); + this.ServiceStats.OrchestrationDispatcherStats.SessionSets.Increment(); + } + catch (OrchestrationException exception) + { + // basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient + // it is possible to have other messages in front of the queue and those will get processed before + // the terminate message gets processed. but that is ok since in the worst case scenario we will + // simply land in this if-block again and end up queuing up another terminate message. + // + // the interesting scenario is when the second time we *dont* land in this if-block because e.g. + // the new messages that we processed caused a new generation to be created. in that case + // it is still ok because the worst case scenario is that we will terminate a newly created generation + // which shouldn't have been created at all in the first place + + isSessionSizeThresholdExceeded = true; + + string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.SessionSettings.SessionMaxSizeInBytes} bytes. More info: {exception.StackTrace}"; + TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason); + + BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason); + await orchestratorQueueClient.SendAsync(forcedTerminateMessage); + + this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(); + this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment(); } return !isSessionSizeThresholdExceeded; @@ -1476,7 +1504,7 @@ async Task CreateQueueAsync( await namespaceManager.CreateQueueAsync(description); } - BrokeredMessage CreateForcedTerminateMessage(string instanceId, string reason) + Task CreateForcedTerminateMessageAsync(string instanceId, string reason) { var newOrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId }; var taskMessage = new TaskMessage @@ -1485,44 +1513,14 @@ BrokeredMessage CreateForcedTerminateMessage(string instanceId, string reason) Event = new ExecutionTerminatedEvent(-1, reason) }; - BrokeredMessage message = ServiceBusUtils.GetBrokeredMessageFromObject( + return ServiceBusUtils.GetBrokeredMessageFromObjectAsync( taskMessage, Settings.MessageCompressionSettings, + Settings.MessageSettings, newOrchestrationInstance, - "Forced Terminate"); - - return message; - } - - static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId) - { - OrchestrationRuntimeState runtimeState; - if (stateStream == null) - { - TraceHelper.TraceSession(TraceEventType.Information, sessionId, - "No session state exists, creating new session state."); - runtimeState = new OrchestrationRuntimeState(); - } - else - { - if (stateStream.Position != 0) - { - throw TraceHelper.TraceExceptionSession(TraceEventType.Error, sessionId, - new ArgumentException("Stream is partially consumed")); - } - - string serializedState = null; - using (var reader = new StreamReader(stateStream)) - { - serializedState = reader.ReadToEnd(); - } - - OrchestrationRuntimeState restoredState = DataConverter.Deserialize(serializedState); - // Create a new Object with just the events, we don't want the rest - runtimeState = new OrchestrationRuntimeState(restoredState.Events); - } - - return runtimeState; + "Forced Terminate", + this.BlobStore, + DateTime.MinValue); } void ThrowIfInstanceStoreNotConfigured() diff --git a/Framework/Settings/ServiceBusMessageSettings.cs b/Framework/Settings/ServiceBusMessageSettings.cs new file mode 100644 index 000000000..5edc75b75 --- /dev/null +++ b/Framework/Settings/ServiceBusMessageSettings.cs @@ -0,0 +1,44 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Settings +{ + /// + /// Settings to configure the Service Bus message. + /// TODO: add a flag OverflowEnabled to indicate if the overflow settings are enabled + /// + public class ServiceBusMessageSettings + { + + internal ServiceBusMessageSettings() : + this(FrameworkConstants.MessageOverflowThresholdInBytesDefault, FrameworkConstants.MessageMaxSizeInBytesDefault) + { + } + + internal ServiceBusMessageSettings(int messageOverflowThresholdInBytes, int messageMaxSizeInBytes) + { + MessageOverflowThresholdInBytes = messageOverflowThresholdInBytes; + MessageMaxSizeInBytes = messageMaxSizeInBytes; + } + + /// + /// The max allowed message size in service bus. Default is 170K. + /// + public int MessageOverflowThresholdInBytes { get; set; } + + /// + /// The max allowed message size for external storage. Default is 10M. + /// + public int MessageMaxSizeInBytes { get; set; } + } +} diff --git a/Framework/Settings/ServiceBusOrchestrationServiceSettings.cs b/Framework/Settings/ServiceBusOrchestrationServiceSettings.cs index 3bf7fa17f..dea64c422 100644 --- a/Framework/Settings/ServiceBusOrchestrationServiceSettings.cs +++ b/Framework/Settings/ServiceBusOrchestrationServiceSettings.cs @@ -32,6 +32,9 @@ public ServiceBusOrchestrationServiceSettings() TaskActivityDispatcherSettings = new TaskActivityDispatcherSettings(); TrackingDispatcherSettings = new TrackingDispatcherSettings(); JumpStartSettings = new JumpStartSettings(); + SessionSettings = new ServiceBusSessionSettings(); + MessageSettings = new ServiceBusMessageSettings(); + MessageCompressionSettings = new CompressionSettings { Style = CompressionStyle.Never, @@ -96,7 +99,18 @@ public ServiceBusOrchestrationServiceSettings() /// Enable compression of messages. Allows exchange of larger parameters and return values with activities at the cost /// of additional CPU. /// Default is false. + /// TODO: move this setting into ServiceBusSessionSettings and ServiceBusMessageSettings. /// public CompressionSettings MessageCompressionSettings { get; set; } + + /// + /// Settings to configure the session + /// + public ServiceBusSessionSettings SessionSettings { get; set; } + + /// + /// Settings to configure the message + /// + public ServiceBusMessageSettings MessageSettings { get; set; } } } \ No newline at end of file diff --git a/Framework/Settings/ServiceBusSessionSettings.cs b/Framework/Settings/ServiceBusSessionSettings.cs new file mode 100644 index 000000000..2411e3201 --- /dev/null +++ b/Framework/Settings/ServiceBusSessionSettings.cs @@ -0,0 +1,43 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Settings +{ + /// + /// Settings to configure the Service Bus session. + /// TODO: add a flag OverflowEnabled to indicate if the overflow settings are enabled + /// + public class ServiceBusSessionSettings + { + internal ServiceBusSessionSettings() : + this (FrameworkConstants.SessionOverflowThresholdInBytesDefault, FrameworkConstants.SessionMaxSizeInBytesDefault) + { + } + + internal ServiceBusSessionSettings(int sessionOverflowThresholdInBytes, int sessionMaxSizeInBytes) + { + SessionOverflowThresholdInBytes = sessionOverflowThresholdInBytes; + SessionMaxSizeInBytes = sessionMaxSizeInBytes; + } + + /// + /// The max allowed session size in service bus. Default is 230K. + /// + public int SessionOverflowThresholdInBytes { get; set; } + + /// + /// The max allowed session size for external storage. Default is 10M. + /// + public int SessionMaxSizeInBytes { get; set; } + } +} diff --git a/Framework/TaskHubClient.cs b/Framework/TaskHubClient.cs index c9fa952c4..416a9fac1 100644 --- a/Framework/TaskHubClient.cs +++ b/Framework/TaskHubClient.cs @@ -337,7 +337,7 @@ public Task GetOrchestrationHistoryAsync(OrchestrationInstance instance) public Task PurgeOrchestrationInstanceHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType) { - return this.serviceClient.PurgeOrchestrationInstanceHistoryAsync(thresholdDateTimeUtc, timeRangeFilterType); + return this.serviceClient.PurgeOrchestrationHistoryAsync(thresholdDateTimeUtc, timeRangeFilterType); } } } \ No newline at end of file diff --git a/Framework/Tracking/AzureStorageBlobStore.cs b/Framework/Tracking/AzureStorageBlobStore.cs new file mode 100644 index 000000000..57058f4d5 --- /dev/null +++ b/Framework/Tracking/AzureStorageBlobStore.cs @@ -0,0 +1,104 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Tracking +{ + using System; + using System.IO; + using System.Threading.Tasks; + + /// + /// Azure blob storage to allow save and load large blobs, such as message and session, as a stream using Azure blob container. + /// + public class AzureStorageBlobStore : IOrchestrationServiceBlobStore + { + /// + /// The client to access and manage the blob store + /// + readonly BlobStorageClient blobClient; + + /// + /// Creates a new AzureStorageBlobStore using the supplied hub name and connection string + /// + /// The hub name for this store + /// Azure storage connection string + public AzureStorageBlobStore(string hubName, string connectionString) + { + this.blobClient = new BlobStorageClient(hubName, connectionString); + } + + /// + /// Create a blob storage access key based on the orchestrationInstance. + /// This key will be used to save and load the stream message in external storage when it is too large. + /// + /// The orchestration instance. + /// The message fire time. + /// The created blob key. + public string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime) + { + return BlobStorageClientHelper.BuildMessageBlobKey( + orchestrationInstance != null ? orchestrationInstance.InstanceId : "null", + orchestrationInstance != null ? orchestrationInstance.ExecutionId : "null", + messageFireTime); + } + + /// + /// Create a blob storage access key based on message session. + /// This key will be used to save and load the stream in external storage when it is too large. + /// + /// The message session Id. + /// A blob key. + public string BuildSessionBlobKey(string sessionId) + { + return BlobStorageClientHelper.BuildSessionBlobKey(sessionId); + } + + /// + /// Save the stream of the message or seesion using key. + /// + /// The blob key. + /// The stream of the message or session. + /// + public Task SaveStreamAsync(string blobKey, Stream stream) + { + return this.blobClient.UploadStreamBlobAsync(blobKey, stream); + } + + /// + /// Load the stream of message or seesion from storage using key. + /// + /// The blob key. + /// The saved stream message or session. + public Task LoadStreamAsync(string blobKey) + { + return this.blobClient.DownloadStreamAsync(blobKey); + } + + /// + /// Deletes the Azure blob storage + /// + public Task DeleteStoreAsync() + { + return this.blobClient.DeleteBlobStoreContainersAsync(); + } + + /// + /// Purges history from storage for a given time threshold + /// + /// The datetime in UTC to use as the threshold for purging history + public Task PurgeExpiredBlobsAsync(DateTime thresholdDateTimeUtc) + { + return this.blobClient.DeleteExpiredContainersAsync(thresholdDateTimeUtc); + } + } +} diff --git a/Framework/Tracking/BlobStorageClient.cs b/Framework/Tracking/BlobStorageClient.cs new file mode 100644 index 000000000..f71660aa2 --- /dev/null +++ b/Framework/Tracking/BlobStorageClient.cs @@ -0,0 +1,140 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Tracking +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.WindowsAzure.Storage; + using Microsoft.WindowsAzure.Storage.Blob; + using Microsoft.WindowsAzure.Storage.RetryPolicies; + + /// + /// A client to access the Azure blob storage. + /// + public class BlobStorageClient + { + // container prefix is in the format of {hubName}-dtfx. It is not part of the blob key. + // the container full name is in the format of {hubName}-dtfx-{streamType}-{DateTime}; + // the streamType is the type of the stream, either 'message' or 'session'; + // the date time is in the format of yyyyMMdd. + readonly string containerNamePrefix; + readonly CloudBlobClient blobClient; + + const int MaxRetries = 3; + static readonly TimeSpan MaximumExecutionTime = TimeSpan.FromSeconds(30); + static readonly TimeSpan DeltaBackOff = TimeSpan.FromSeconds(5); + + /// + /// Construct a blob storage client instance with hub name and connection string + /// + /// The hub name + /// The connection string + public BlobStorageClient(string hubName, string connectionString) + { + if (string.IsNullOrEmpty(connectionString)) + { + throw new ArgumentException("Invalid connection string", nameof(connectionString)); + } + + if (string.IsNullOrEmpty(hubName)) + { + throw new ArgumentException("Invalid hub name", nameof(hubName)); + } + + this.blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient(); + this.blobClient.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(DeltaBackOff, MaxRetries); + this.blobClient.DefaultRequestOptions.MaximumExecutionTime = MaximumExecutionTime; + + // make the hub name lower case since it will be used as part of the prefix of the container name, + // which only allows lower case letters + this.containerNamePrefix = BlobStorageClientHelper.BuildContainerNamePrefix(hubName.ToLower()); + } + + /// + /// Upload the stream into the blob storage using the specified key. + /// + /// The key to uniquely locate and access the blob + /// The stream to be uploaded + /// + public async Task UploadStreamBlobAsync(string key, Stream stream) + { + string containerNameSuffix; + string blobName; + BlobStorageClientHelper.ParseKey(key, out containerNameSuffix, out blobName); + var cloudBlob = await this.GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName); + await cloudBlob.UploadFromStreamAsync(stream); + } + + /// + /// Download the blob from the storage using key. + /// + /// The key to uniquely locate and access the blob + /// A downloaded stream + public async Task DownloadStreamAsync(string key) + { + string containerNameSuffix; + string blobName; + BlobStorageClientHelper.ParseKey(key, out containerNameSuffix, out blobName); + + var cloudBlob = await this.GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName); + Stream targetStream = new MemoryStream(); + await cloudBlob.DownloadToStreamAsync(targetStream); + targetStream.Position = 0; + return targetStream; + } + + async Task GetCloudBlockBlobReferenceAsync(string containerNameSuffix, string blobName) + { + string containerName = BlobStorageClientHelper.BuildContainerName(this.containerNamePrefix, containerNameSuffix); + var cloudBlobContainer = this.blobClient.GetContainerReference(containerName); + await cloudBlobContainer.CreateIfNotExistsAsync(); + return cloudBlobContainer.GetBlockBlobReference(blobName); + } + + /// + /// List all containers of the blob storage, whose prefix is containerNamePrefix, i.e., {hubName}-dtfx. + /// + /// A list of Azure blob containers + public IEnumerable ListContainers() + { + return this.blobClient.ListContainers(this.containerNamePrefix); + } + + /// + /// Delete all contianers that are older than the input threshold date. + /// + /// The specified date threshold + /// + public async Task DeleteExpiredContainersAsync(DateTime thresholdDateTimeUtc) + { + IEnumerable containers = ListContainers(); + var tasks = containers.Where(container => BlobStorageClientHelper.IsContainerExpired(container.Name, thresholdDateTimeUtc)).ToList().Select(container => container.DeleteIfExistsAsync()); + await Task.WhenAll(tasks); + } + + /// + /// Delete blob containers with the containerNamePrefix as prefix. + /// + /// + public async Task DeleteBlobStoreContainersAsync() + { + IEnumerable containers = this.ListContainers(); + var tasks = containers.ToList().Select(container => container.DeleteIfExistsAsync()); + await Task.WhenAll(tasks); + } + } +} diff --git a/Framework/Tracking/BlobStorageClientHelper.cs b/Framework/Tracking/BlobStorageClientHelper.cs new file mode 100644 index 000000000..277b0f22d --- /dev/null +++ b/Framework/Tracking/BlobStorageClientHelper.cs @@ -0,0 +1,192 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +using System.Globalization; + +namespace DurableTask.Tracking +{ + using System; + using System.Diagnostics; + using System.Text.RegularExpressions; + using DurableTask.Tracing; + + /// + /// A helper class for the Azure blob storage client. + /// + public class BlobStorageClientHelper + { + static readonly string DateFormat = "yyyyMMdd"; + static readonly char ContainerNameDelimiter = '-'; + + /// + /// the blob storage accesss key is in the format of {DateTime}|{blobName} + /// + public static readonly char KeyDelimiter = '|'; + + /// + /// the delimiter shown in the blob name as the file path + /// + public static readonly char BlobNameDelimiter = '/'; + + /// + /// Build a blob key for the message. + /// + /// The orchestration instance Id + /// The orchestration execution Id + /// The message fire time. If it is DateTime.MinValue, use current date. + /// The constructed blob key for message + public static string BuildMessageBlobKey(string instanceId, string executionId, DateTime messageFireTime) + { + string id = Guid.NewGuid().ToString("N"); + return string.Format( + "{0}{1}{2}{3}{4}{3}{5}", + BuildContainerNameSuffix("message", messageFireTime), + KeyDelimiter, + instanceId, + BlobNameDelimiter, + executionId, + id); + } + + static string BuildContainerNameSuffix(string containerType, DateTime blobCreationTime) + { + return $"{containerType.ToLower()}{ContainerNameDelimiter}{GetDateStringForContainerName(blobCreationTime)}"; + } + + /// + /// Build the container name prefix using the lower case hub name. + /// It is in the format of {hubName}-dtfx. + /// The container name prefix is not part of the generated blob key. + /// + /// The hub name. Converted to lower case to build the prefix. + /// The container name prefix + public static string BuildContainerNamePrefix(string hubName) + { + return $"{hubName.ToLower()}{ContainerNameDelimiter}dtfx"; + } + + /// + /// Build a blob key for the session. + /// + /// The session Id + /// The constructed blob key for session + public static string BuildSessionBlobKey(string sessionId) + { + string id = Guid.NewGuid().ToString("N"); + return string.Format( + "{0}{1}{2}{3}{4}", + BuildContainerNameSuffix("session", DateTime.MinValue), + KeyDelimiter, + sessionId, + BlobNameDelimiter, + id); + } + + // use the message fire time if it is set; + // otherwise, use the current utc time as the date string as part of the container name + static string GetDateStringForContainerName(DateTime messageFireTime) + { + return messageFireTime != DateTime.MinValue ? + messageFireTime.ToString(DateFormat) : + DateTime.UtcNow.ToString(DateFormat); + } + + /// + /// Parse the key for the container name suffix and the blob name. + /// + /// The input blob key + /// The parsed container name suffix as output + /// The parsed blob name as output + public static void ParseKey(string key, out string containerNameSuffix, out string blobName) + { + string[] segments = key.Split(new[] {BlobStorageClientHelper.KeyDelimiter}, 2); + if (segments.Length < 2) + { + throw new ArgumentException($"Blob key {key} does not contain required 2 or more segments: containerNameSuffix|blobName.", nameof(key)); + } + + containerNameSuffix = segments[0]; + if (!IsValidContainerNameSuffix(containerNameSuffix)) + { + throw new ArgumentException( + $"Not a valid container name suffix: {containerNameSuffix}. " + + "Container name suffix can contain only lower case letters, numbers, and the dash (-) character.", + nameof(containerNameSuffix)); + } + + blobName = segments[1]; + } + + /// + /// Validate the container name suffix. + /// Container name suffix can contain only lower case letters, numbers, and the dash (-) character. + /// + /// + /// True if the container name suffix is valid. + static bool IsValidContainerNameSuffix(string containerNameSuffix) + { + Regex regex = new Regex(@"^[a-z0-9\\-]+$"); + return regex.Match(containerNameSuffix).Success; + } + + /// + /// Check if the container is expired. + /// + /// The container name + /// The specified date threshold + /// + public static bool IsContainerExpired(string containerName, DateTime thresholdDateTimeUtc) + { + string[] segments = containerName.Split(ContainerNameDelimiter); + if (segments.Length != 4) + { + TraceHelper.Trace( + TraceEventType.Warning, + $"Container name {containerName} does not contain required 4 segments. Container {containerName} is ignored."); + + return false; + } + + DateTime containerDateTime; + string dateString = segments[segments.Length - 1]; + bool parseSucceeded = DateTime.TryParseExact( + dateString, + DateFormat, + System.Globalization.CultureInfo.InvariantCulture, + DateTimeStyles.None, + out containerDateTime); + + if (!parseSucceeded) + { + TraceHelper.Trace( + TraceEventType.Warning, + $"Cannot parse the the date string {dateString} in the format of yyyyMMdd. Container {containerName} is ignored."); + + return false; + } + + return containerDateTime < thresholdDateTimeUtc; + } + + /// + /// Build a container name using prefix and suffix. + /// + /// The container name prefix + /// The container name suffix + /// The container name + public static string BuildContainerName(string prefix, string suffix) + { + return $"{prefix}{ContainerNameDelimiter}{suffix}"; + } + } +} diff --git a/Framework/Tracking/IOrchestrationServiceBlobStore.cs b/Framework/Tracking/IOrchestrationServiceBlobStore.cs new file mode 100644 index 000000000..b830a228c --- /dev/null +++ b/Framework/Tracking/IOrchestrationServiceBlobStore.cs @@ -0,0 +1,70 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Tracking +{ + using System; + using System.IO; + using System.Threading.Tasks; + + /// + /// Interface to allow save and load large blobs, such as message and session, as a stream using a storage store. + /// The blob is saved in the store using an access key (e.g., a path to the blob), + /// which can be used to uniquely load the blob back. + /// + public interface IOrchestrationServiceBlobStore + { + /// + /// Create a blob storage access key based on the orchestrationInstance. + /// This key will be used to save and load the stream message in external storage when it is too large. + /// + /// The orchestration instance. + /// The message fire time. Could be DateTime.MinValue. + /// A message blob key. + string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime); + + /// + /// Create a blob storage access key based on message session. + /// This key will be used to save and load the stream in external storage when it is too large. + /// + /// The message session Id. + /// A blob key. + string BuildSessionBlobKey(string sessionId); + + /// + /// Save the stream of the message or seesion using key. + /// + /// The blob key. + /// The stream of the message or session. + /// + Task SaveStreamAsync(string blobKey, Stream stream); + + /// + /// Load the stream of message or seesion from storage using key. + /// + /// The blob key. + /// The saved stream message or session. + Task LoadStreamAsync(string blobKey); + + /// + /// Deletes the blob store + /// + Task DeleteStoreAsync(); + + /// + /// Purges expired containers from storage for given time threshold + /// + /// The datetime in UTC to use as the threshold for purging containers + Task PurgeExpiredBlobsAsync(DateTime thresholdDateTimeUtc); + } +} diff --git a/FrameworkUnitTests/BlobStorageClientHelperTest.cs b/FrameworkUnitTests/BlobStorageClientHelperTest.cs new file mode 100644 index 000000000..39bb39a93 --- /dev/null +++ b/FrameworkUnitTests/BlobStorageClientHelperTest.cs @@ -0,0 +1,108 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace FrameworkUnitTests +{ + using System; + using System.Text.RegularExpressions; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using DurableTask.Tracking; + + [TestClass] + public class BlobStorageClientHelperTest + { + [TestMethod] + public void IsContainerExpiredTest() + { + Assert.AreEqual("ab-cd", BlobStorageClientHelper.BuildContainerName("ab", "cd")); + + Assert.IsTrue(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20100101", DateTime.UtcNow)); + Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-session-20990101", DateTime.UtcNow)); + + DateTime dateTime = new DateTime(2015, 05, 17); + Assert.IsTrue(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20150516", dateTime)); + Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20150517", dateTime)); + Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20150518", dateTime)); + Assert.IsTrue(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20140518", dateTime)); + + // invalid containers are ignored + Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("invalidContainerName", DateTime.UtcNow)); + Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("hubname-dtfx-message-20146789", DateTime.UtcNow)); + } + + [TestMethod] + public void BuildMessageBlobKeyTest() + { + string instanceId = "aa"; + string executionId = "bb"; + DateTime messageFireTime = new DateTime(2015, 05, 17); + string key = BlobStorageClientHelper.BuildMessageBlobKey(instanceId, executionId, messageFireTime); + Regex regex = new Regex(@"message-20150517|aa/bb/\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + + key = BlobStorageClientHelper.BuildMessageBlobKey(instanceId, executionId, DateTime.MinValue); + regex = new Regex(@"message-\d{8}|aa/bb/\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + } + + [TestMethod] + public void BuildSessionBlobKeyTest() + { + string sessionId = "abc"; + string key = BlobStorageClientHelper.BuildSessionBlobKey(sessionId); + Regex regex = new Regex(@"^session-\d{8}|abc/\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + } + + [TestMethod] + public void BuildContainerNamePrefixTest() + { + string hubName = "HubName"; + string containerNamePrefix = BlobStorageClientHelper.BuildContainerNamePrefix(hubName); + Assert.AreEqual("hubname-dtfx", containerNamePrefix); + } + + [TestMethod] + public void ParseKeyTest() + { + string key = "message-20100319|aa/bb/cc"; + string containerSuffix; + string blobName; + BlobStorageClientHelper.ParseKey(key, out containerSuffix, out blobName); + + Assert.AreEqual("message-20100319", containerSuffix); + Assert.AreEqual("aa/bb/cc", blobName); + + try + { + BlobStorageClientHelper.ParseKey("invalidKey", out containerSuffix, out blobName); + Assert.Fail("ArgumentException must be thrown"); + } + catch (ArgumentException e) + { + Assert.IsTrue(e.Message.Contains("key"), "Exception must contain key."); + } + + try + { + // invalid container name suffix: only lower case letters and numbers are allowed + BlobStorageClientHelper.ParseKey("Message-20100319|aa/bb/cc", out containerSuffix, out blobName); + Assert.Fail("ArgumentException must be thrown"); + } + catch (ArgumentException e) + { + Assert.IsTrue(e.Message.Contains("Message-20100319"), "Exception must contain the invalid container name suffix."); + } + } + } +} diff --git a/FrameworkUnitTests/BlobStorageClientTest.cs b/FrameworkUnitTests/BlobStorageClientTest.cs new file mode 100644 index 000000000..32bf875fd --- /dev/null +++ b/FrameworkUnitTests/BlobStorageClientTest.cs @@ -0,0 +1,91 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace FrameworkUnitTests +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Microsoft.WindowsAzure.Storage.Blob; + using DurableTask.Tracking; + + [TestClass] + public class BlobStorageClientTest + { + private BlobStorageClient blobStorageClient; + + [TestInitialize] + public void TestInitialize() + { + var r = new Random(); + blobStorageClient = new BlobStorageClient( + "Test00" + r.Next(0, 10000), + TestHelpers.GetTestSetting("StorageConnectionString")); + } + + [TestCleanup] + public void TestCleanup() + { + List containers = blobStorageClient.ListContainers().ToList(); + containers.ForEach(container => container.DeleteIfExists()); + containers = blobStorageClient.ListContainers().ToList(); + Assert.AreEqual(0, containers.Count); + } + + [TestMethod] + public async Task TestStreamBlobCreationAndDeletion() + { + string testContent = "test stream content"; + string key = "message-20101003|testBlobName"; + Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(testContent)); + await blobStorageClient.UploadStreamBlobAsync(key, stream); + + MemoryStream result = await blobStorageClient.DownloadStreamAsync(key) as MemoryStream; + string resultString = Encoding.UTF8.GetString(result.ToArray()); + Assert.AreEqual(resultString, testContent); + } + + [TestMethod] + public async Task TestDeleteContainers() + { + string testContent = "test stream content"; + string key1 = "message-20150516|a"; + string key2 = "message-20150517|b"; + string key3 = "message-20150518|c"; + + Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(testContent)); + await blobStorageClient.UploadStreamBlobAsync(key1, stream); + await blobStorageClient.UploadStreamBlobAsync(key2, stream); + await blobStorageClient.UploadStreamBlobAsync(key3, stream); + + DateTime dateTime = new DateTime(2015, 05, 17); + await blobStorageClient.DeleteExpiredContainersAsync(dateTime); + + List containers = blobStorageClient.ListContainers().ToList(); + Assert.AreEqual(2, containers.Count); + List sortedList = new List {containers[0].Name, containers[1].Name}; + sortedList.Sort(); + + Assert.IsTrue(sortedList[0].EndsWith("20150517")); + Assert.IsTrue(sortedList[1].EndsWith("20150518")); + + await blobStorageClient.DeleteBlobStoreContainersAsync(); + containers = blobStorageClient.ListContainers().ToList(); + Assert.AreEqual(0, containers.Count); + } + } +} diff --git a/FrameworkUnitTests/DispatcherTests.cs b/FrameworkUnitTests/DispatcherTests.cs index 8b074a350..8eae84a7e 100644 --- a/FrameworkUnitTests/DispatcherTests.cs +++ b/FrameworkUnitTests/DispatcherTests.cs @@ -11,15 +11,17 @@ // limitations under the License. // ---------------------------------------------------------------------------------- - namespace FrameworkUnitTests { using System; using System.Collections.Generic; using System.Linq; + using System.Text; using System.Threading; using System.Threading.Tasks; using DurableTask; + using DurableTask.Common; + using DurableTask.Settings; using DurableTask.Test; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -635,16 +637,24 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration)) .AddTaskActivities(typeof (LargeSessionTaskActivity)) .StartAsync(); - OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), 50); + await SessionExceededLimitSubTestWithInputSize(100 * 1024); + await SessionExceededLimitSubTestWithInputSize(200 * 1024); + await SessionExceededLimitSubTestWithInputSize(300 * 1024); + await SessionExceededLimitSubTestWithInputSize(500 * 1024); + await SessionExceededLimitSubTestWithInputSize(1000 * 1024); + } - bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); + async Task SessionExceededLimitSubTestWithInputSize(int inputSize) + { + string input = TestUtils.GenerateRandomString(inputSize); + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), new Tuple(input, 2)); + bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); await Task.Delay(20000); OrchestrationState state = await client.GetOrchestrationStateAsync(id); - - Assert.AreEqual(OrchestrationStatus.Terminated, state.OrchestrationStatus); - Assert.IsTrue(state.Output.Contains("exceeded")); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result); } [TestMethod] @@ -654,25 +664,63 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration)) .AddTaskActivities(typeof (LargeSessionTaskActivity)) .StartAsync(); - OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), 15); + string input = "abc"; - bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 90, true); + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), new Tuple(input, 2)); + + bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); await Task.Delay(20000); OrchestrationState state = await client.GetOrchestrationStateAsync(id); Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result); } [TestMethod] public async Task SessionExceededLimitNoCompressionTest() { - await taskHubNoCompression.AddTaskOrchestrations(typeof (LargeSessionOrchestration)) + string input = TestUtils.GenerateRandomString(150 * 1024); + + ServiceBusOrchestrationService serviceBusOrchestrationService = + taskHub.orchestrationService as ServiceBusOrchestrationService; + serviceBusOrchestrationService.Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState = + false; + + await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration)) .AddTaskActivities(typeof (LargeSessionTaskActivity)) .StartAsync(); - OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), 15); + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), new Tuple(input, 2)); + + bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); + + await Task.Delay(20000); + + OrchestrationState state = await client.GetOrchestrationStateAsync(id); + + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result); + } + + [TestMethod] + public async Task MessageExceededLimitNoCompressionTest() + { + string input = TestUtils.GenerateRandomString(150 * 1024); + + ServiceBusOrchestrationService serviceBusOrchestrationService = client.serviceClient as ServiceBusOrchestrationService; + serviceBusOrchestrationService.Settings.MessageCompressionSettings = new CompressionSettings + { + Style = CompressionStyle.Never, + ThresholdInBytes = 0 + }; + + await taskHub.AddTaskOrchestrations(typeof(LargeSessionOrchestration)) + .AddTaskActivities(typeof(LargeSessionTaskActivity)) + .StartAsync(); + + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), new Tuple(input, 2)); bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); @@ -680,35 +728,56 @@ await taskHubNoCompression.AddTaskOrchestrations(typeof (LargeSessionOrchestrati OrchestrationState state = await client.GetOrchestrationStateAsync(id); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result); + } + + [TestMethod] + public async Task SessionExceededTerminationLimitTest() + { + string input = TestUtils.GenerateRandomString(200 * 1024); + + await taskHub.AddTaskOrchestrations(typeof(LargeSessionOrchestration)) + .AddTaskActivities(typeof(LargeSessionTaskActivity)) + .StartAsync(); + + ServiceBusOrchestrationService serviceBusOrchestrationService = + taskHub.orchestrationService as ServiceBusOrchestrationService; + serviceBusOrchestrationService.Settings.SessionSettings = new ServiceBusSessionSettings(230 * 1024, 1024 * 1024); + + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), new Tuple(input, 10)); + bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60, true); + await Task.Delay(20000); + + OrchestrationState state = await client.GetOrchestrationStateAsync(id); Assert.AreEqual(OrchestrationStatus.Terminated, state.OrchestrationStatus); Assert.IsTrue(state.Output.Contains("exceeded")); } - public class LargeSessionOrchestration : TaskOrchestration + public class LargeSessionOrchestration : TaskOrchestration> { - public override async Task RunTask(OrchestrationContext context, int input) + // HACK: This is just a hack to communicate result of orchestration back to test + public static string Result; + + public override async Task RunTask(OrchestrationContext context, Tuple input) { - for (int i = 0; i < input; i++) + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < input.Item2; i++) { - await context.ScheduleTask(typeof (LargeSessionTaskActivity)); + string outputI = await context.ScheduleTask(typeof(LargeSessionTaskActivity), $"{i}:{input.Item1}"); + sb.Append($"{outputI}-"); } - return string.Empty; + Result = sb.ToString(); + return Result; } } - public sealed class LargeSessionTaskActivity : TaskActivity + public sealed class LargeSessionTaskActivity : TaskActivity { - protected override byte[] Execute(TaskContext context, string input) + protected override string Execute(TaskContext context, string input) { - var arr = new byte[16000]; - - for (int i = 0; i < 1000; i++) - { - Guid.NewGuid().ToByteArray().CopyTo(arr, i*16); - } - - return arr; + return input; } } diff --git a/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj b/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj index 3bf714981..148a84f1f 100644 --- a/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj +++ b/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj @@ -119,6 +119,8 @@ + + @@ -129,6 +131,7 @@ + @@ -138,6 +141,7 @@ + diff --git a/FrameworkUnitTests/FunctionalTests.cs b/FrameworkUnitTests/FunctionalTests.cs index df60c95ad..4f6c9cd00 100644 --- a/FrameworkUnitTests/FunctionalTests.cs +++ b/FrameworkUnitTests/FunctionalTests.cs @@ -709,7 +709,7 @@ await taskHub.AddTaskOrchestrations(c1, c2) "UberOrchestration", "V1", "TestInstance", - new TestOrchestrationInput { Iterations = numSubOrchestrations, Payload = GeneratePayLoad(90 * 1024) }); + new TestOrchestrationInput { Iterations = numSubOrchestrations, Payload = TestUtils.GenerateRandomString(90 * 1024) }); // Waiting for 60 seconds guarantees that to pass the orchestrations must run in parallel bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, instance, 60); @@ -735,7 +735,7 @@ await taskHub.AddTaskOrchestrations(c2) "SleeperSubOrchestration", "V1", $"{UberOrchestration.ChildWorkflowIdBase}_{i}", - new TestOrchestrationInput { Iterations = 1, Payload = GeneratePayLoad(8 * 1024) })); + new TestOrchestrationInput { Iterations = 1, Payload = TestUtils.GenerateRandomString(8 * 1024) })); } IList orchestrationInstances = (await Task.WhenAll(orchestrations)).ToList(); @@ -750,18 +750,6 @@ await taskHub.AddTaskOrchestrations(c2) Assert.AreEqual(numSubOrchestrations, finalResults.Count(status => status.OrchestrationStatus == OrchestrationStatus.Completed)); } - private static string GeneratePayLoad(int length) - { - var result = new StringBuilder(length); - while (result.Length < length) - { - // Use Guids so these don't compress well - result.Append(Guid.NewGuid().ToString("N")); - } - - return result.ToString(0, length); - } - class TestOrchestrationInput { public int Iterations { get; set; } @@ -793,7 +781,7 @@ public override async Task RunTask(OrchestrationContext context, TestOrches "SleeperSubOrchestration", "V1", $"{ChildWorkflowIdBase}_{i}", - new TestOrchestrationInput { Iterations = 1, Payload = GeneratePayLoad(8 * 1024) })); + new TestOrchestrationInput { Iterations = 1, Payload = TestUtils.GenerateRandomString(8 * 1024) })); } int[] data = await Task.WhenAll(tasks); diff --git a/FrameworkUnitTests/Mocks/LocalOrchestrationService.cs b/FrameworkUnitTests/Mocks/LocalOrchestrationService.cs index 0b781623e..9a01959e7 100644 --- a/FrameworkUnitTests/Mocks/LocalOrchestrationService.cs +++ b/FrameworkUnitTests/Mocks/LocalOrchestrationService.cs @@ -314,7 +314,7 @@ public Task GetOrchestrationHistoryAsync(string instanceId, string execu throw new NotImplementedException(); } - public Task PurgeOrchestrationInstanceHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType) + public Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType) { throw new NotImplementedException(); } diff --git a/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs b/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs new file mode 100644 index 000000000..088d24d9f --- /dev/null +++ b/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs @@ -0,0 +1,246 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace FrameworkUnitTests +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using DurableTask; + using DurableTask.Common; + using DurableTask.Exceptions; + using DurableTask.History; + using DurableTask.Serializing; + using DurableTask.Settings; + using DurableTask.Tracking; + + [TestClass] + public class RuntimeStateStreamConverterTest + { + const int sessionOverflowThresholdInBytes = 2 * 1024; + const int sessionMaxSizeInBytes = 10 * 1024; + const string sessionId = "session123"; + ServiceBusSessionSettings serviceBusSessionSettings = new ServiceBusSessionSettings(sessionOverflowThresholdInBytes, sessionMaxSizeInBytes); + + AzureTableInstanceStore azureTableInstanceStore; + AzureStorageBlobStore azureStorageBlobStore; + + [TestInitialize] + public void TestInitialize() + { + azureTableInstanceStore = TestHelpers.CreateAzureTableInstanceStore(); + azureStorageBlobStore = TestHelpers.CreateAzureStorageBlobStore(); + } + + [TestCleanup] + public void TestCleanup() + { + azureTableInstanceStore.DeleteStoreAsync().Wait(); + azureStorageBlobStore.DeleteStoreAsync().Wait(); + } + + [TestMethod] + public async Task SmallRuntimeStateConverterTest() + { + string smallInput = "abc"; + + OrchestrationRuntimeState newOrchestrationRuntimeStateSmall = generateOrchestrationRuntimeState(smallInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // a small runtime state doesn't need external storage. + Stream rawStreamSmall = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateSmall, + runtimeState, + dataConverter, + true, + serviceBusSessionSettings, + azureStorageBlobStore, + sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall, "sessionId", azureStorageBlobStore, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + // test for un-compress case + Stream rawStreamSmall2 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateSmall, + runtimeState, + dataConverter, + false, + serviceBusSessionSettings, + azureStorageBlobStore, + sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall2, "sessionId", azureStorageBlobStore, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + + // test for backward comp: ok for an un-implemented (or null) IBlobStorage for small runtime states + Stream rawStreamSmall3 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateSmall, + runtimeState, + dataConverter, + true, + serviceBusSessionSettings, + null, + sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall3 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall3, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall3); + } + + [TestMethod] + public async Task LargeRuntimeStateConverterTest() + { + string largeInput = TestUtils.GenerateRandomString(5 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge = generateOrchestrationRuntimeState(largeInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // a large runtime state that needs external storage. + Stream rawStreamLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateLarge, + runtimeState, + dataConverter, + true, + serviceBusSessionSettings, + azureStorageBlobStore, + sessionId); + OrchestrationRuntimeState convertedRuntimeStateLarge = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge, "sessionId", azureStorageBlobStore, dataConverter); + verifyEventInput(largeInput, convertedRuntimeStateLarge); + + // test for un-compress case + string largeInput2 = TestUtils.GenerateRandomString(3 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge2 = generateOrchestrationRuntimeState(largeInput2); + Stream rawStreamLarge2 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateLarge2, + runtimeState, + dataConverter, + false, + serviceBusSessionSettings, + azureStorageBlobStore, + sessionId); + OrchestrationRuntimeState convertedRuntimeStateLarge2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge2, "sessionId", azureStorageBlobStore, dataConverter); + verifyEventInput(largeInput2, convertedRuntimeStateLarge2); + + // test for an un-implemented (or null) IBlobStorage for large runtime states: should throw exception + try + { + await + RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateLarge, + runtimeState, + dataConverter, + true, + serviceBusSessionSettings, + null, + sessionId); + Assert.Fail("ArgumentException must be thrown"); + } + catch (OrchestrationException e) + { + // expected + Assert.IsTrue(e.Message.Contains("IOrchestrationServiceBlobStore"), "Exception must contain IOrchestrationServiceBlobStore."); + } + } + + [TestMethod] + public async Task VeryLargeRuntimeStateConverterTest() + { + string veryLargeInput = TestUtils.GenerateRandomString(20 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge = generateOrchestrationRuntimeState(veryLargeInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // test for very large size rumtime state that cannot be saved externally: should throw exception + try + { + Stream rawStreamVeryLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream( + newOrchestrationRuntimeStateLarge, + runtimeState, + dataConverter, + true, + serviceBusSessionSettings, + azureStorageBlobStore, + sessionId); + Assert.Fail("ArgumentException must be thrown"); + } + catch (OrchestrationException e) + { + // expected + Assert.IsTrue(e.Message.Contains("exceeded"), "Exception must contain exceeded."); + } + } + + [TestMethod] + public async Task ConverterCompatabilityTest() + { + string smallInput = "abc"; + OrchestrationRuntimeState newOrchestrationRuntimeStateSmall = generateOrchestrationRuntimeState(smallInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // deserialize a OrchestrationRuntimeState object, with both compression and not compression + Stream stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, true); + OrchestrationRuntimeState convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, false); + convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + // deserialize a IList object, with both compression and not compression + Stream stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, true); + OrchestrationRuntimeState convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + + stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, false); + convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + } + + private Stream serializeToStream(DataConverter dataConverter, OrchestrationRuntimeState orchestrationRuntimeState, bool shouldCompress) + { + string serializedState = dataConverter.Serialize(orchestrationRuntimeState); + long originalStreamSize = 0; + return Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + } + + private Stream serializeToStream(DataConverter dataConverter, IList events, bool shouldCompress) + { + string serializedState = dataConverter.Serialize(events); + long originalStreamSize = 0; + return Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + } + + OrchestrationRuntimeState generateOrchestrationRuntimeState(string input) + { + IList historyEvents = new List(); + ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, input); + historyEvents.Add(historyEvent); + OrchestrationRuntimeState newOrchestrationRuntimeState = new OrchestrationRuntimeState(historyEvents); + + return newOrchestrationRuntimeState; + } + + void verifyEventInput(string expectedHistoryEventInput, OrchestrationRuntimeState runtimeState) + { + ExecutionStartedEvent executionStartedEvent = runtimeState.Events[0] as ExecutionStartedEvent; + Assert.AreEqual(expectedHistoryEventInput, executionStartedEvent.Input); + } + } +} diff --git a/FrameworkUnitTests/SampleScenarioTests.cs b/FrameworkUnitTests/SampleScenarioTests.cs index 9c8a7625f..5ef3ae851 100644 --- a/FrameworkUnitTests/SampleScenarioTests.cs +++ b/FrameworkUnitTests/SampleScenarioTests.cs @@ -237,6 +237,38 @@ public override async Task RunTask(OrchestrationContext context, int sec #endregion + #region Message Overflow Test for Large Orchestration Input Output + + [TestMethod] + public async Task MessageOverflowTest() + { + await taskHub.AddTaskOrchestrations(typeof(LargeInputOutputOrchestration)).StartAsync(); + + // generate a large string as the orchestration input; + // make it random so that it won't be compressed too much. + var largeInput = TestUtils.GenerateRandomString(1000 * 1024); + OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeInputOutputOrchestration), largeInput); + + bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 60); + Assert.IsTrue(isCompleted, TestHelpers.GetInstanceNotCompletedMessage(client, id, 60)); + Assert.AreEqual($"output-{largeInput}", LargeInputOutputOrchestration.Result, "Orchestration Result is wrong!!!"); + } + + public class LargeInputOutputOrchestration : TaskOrchestration + { + // HACK: This is just a hack to communicate result of orchestration back to test + public static string Result; + + public override async Task RunTask(OrchestrationContext context, string input) + { + string output = $"output-{input}"; + Result = output; + return output; + } + } + + #endregion Message Overflow Test for Large Orchestration Input Output + #region AverageCalculator Test [TestMethod] diff --git a/FrameworkUnitTests/TestHelpers.cs b/FrameworkUnitTests/TestHelpers.cs index 625776088..14b747889 100644 --- a/FrameworkUnitTests/TestHelpers.cs +++ b/FrameworkUnitTests/TestHelpers.cs @@ -81,6 +81,7 @@ static IOrchestrationService CreateOrchestrationServiceWorker( ServiceBusConnectionString, TaskHubName, new AzureTableInstanceStore(TaskHubName, StorageConnectionString), + new AzureStorageBlobStore(TaskHubName, StorageConnectionString), settings); return service; } @@ -92,10 +93,21 @@ static IOrchestrationServiceClient CreateOrchestrationServiceClient( ServiceBusConnectionString, TaskHubName, new AzureTableInstanceStore(TaskHubName, StorageConnectionString), + new AzureStorageBlobStore(TaskHubName, StorageConnectionString), settings); return service; } + public static AzureTableInstanceStore CreateAzureTableInstanceStore() + { + return new AzureTableInstanceStore(TaskHubName, StorageConnectionString); + } + + public static AzureStorageBlobStore CreateAzureStorageBlobStore() + { + return new AzureStorageBlobStore(TaskHubName, StorageConnectionString); + } + public static TaskHubClient CreateTaskHubClientNoCompression() { return new TaskHubClient(CreateOrchestrationServiceClient(null)); diff --git a/FrameworkUnitTests/TestUtils.cs b/FrameworkUnitTests/TestUtils.cs new file mode 100644 index 000000000..eee950b3f --- /dev/null +++ b/FrameworkUnitTests/TestUtils.cs @@ -0,0 +1,33 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace FrameworkUnitTests +{ + using System; + using System.Text; + + public static class TestUtils + { + public static string GenerateRandomString(int length) + { + var result = new StringBuilder(length); + while (result.Length < length) + { + // Use Guids so these don't compress well + result.Append(Guid.NewGuid().ToString("N")); + } + + return result.ToString(0, length); + } + } +}