Skip to content

Commit

Permalink
Squashed commit of the following: (#53)
Browse files Browse the repository at this point in the history
Add support for large messages and sessions using overflow to IOrchestrationServiceBlob store with AzureBlobStorage implementation.
  • Loading branch information
zheg authored and simonporter committed Aug 5, 2016
1 parent eae7f4c commit 6481e6a
Show file tree
Hide file tree
Showing 25 changed files with 1,812 additions and 193 deletions.
128 changes: 111 additions & 17 deletions Framework/Common/ServiceBusUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ namespace DurableTask.Common
using System.Threading.Tasks;
using DurableTask.Settings;
using DurableTask.Tracing;
using DurableTask.Tracking;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

internal static class ServiceBusUtils
{
public static BrokeredMessage GetBrokeredMessageFromObject(object serializableObject, CompressionSettings compressionSettings)
public static Task<BrokeredMessage> GetBrokeredMessageFromObjectAsync(object serializableObject, CompressionSettings compressionSettings)
{
return GetBrokeredMessageFromObject(serializableObject, compressionSettings, null, null);
return GetBrokeredMessageFromObjectAsync(serializableObject, compressionSettings, new ServiceBusMessageSettings(), null, null, null, DateTime.MinValue);
}

public static BrokeredMessage GetBrokeredMessageFromObject(
public static async Task<BrokeredMessage> GetBrokeredMessageFromObjectAsync(
object serializableObject,
CompressionSettings compressionSettings,
ServiceBusMessageSettings messageSettings,
OrchestrationInstance instance,
string messageType)
string messageType,
IOrchestrationServiceBlobStore orchestrationServiceBlobStore,
DateTime messageFireTime)
{
if (serializableObject == null)
{
Expand All @@ -46,6 +50,11 @@ public static BrokeredMessage GetBrokeredMessageFromObject(
return new BrokeredMessage(serializableObject) { SessionId = instance?.InstanceId };
}

if (messageSettings == null)
{
messageSettings = new ServiceBusMessageSettings();
}

bool disposeStream = true;
var rawStream = new MemoryStream();

Expand All @@ -60,23 +69,33 @@ public static BrokeredMessage GetBrokeredMessageFromObject(
rawStream.Length > compressionSettings.ThresholdInBytes))
{
Stream compressedStream = Utils.GetCompressedStream(rawStream);

brokeredMessage = new BrokeredMessage(compressedStream, true);
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeGzipPropertyValue;

var rawLen = rawStream.Length;
TraceHelper.TraceInstance(TraceEventType.Information, instance,
() =>
"Compression stats for " + (messageType ?? string.Empty) + " : " + brokeredMessage.MessageId +
"Compression stats for " + (messageType ?? string.Empty) + " : " +
brokeredMessage.MessageId +
", uncompressed " + rawLen + " -> compressed " + compressedStream.Length);

if (compressedStream.Length < messageSettings.MessageOverflowThresholdInBytes)
{
brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(compressedStream, FrameworkConstants.CompressionTypeGzipPropertyValue);
}
else
{
brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(compressedStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeGzipPropertyValue);
}
}
else
{
brokeredMessage = new BrokeredMessage(rawStream, true);
disposeStream = false;
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeNonePropertyValue;
if (rawStream.Length < messageSettings.MessageOverflowThresholdInBytes)
{
brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(rawStream, FrameworkConstants.CompressionTypeNonePropertyValue);
disposeStream = false;
}
else
{
brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(rawStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeNonePropertyValue);
}
}

brokeredMessage.SessionId = instance?.InstanceId;
Expand All @@ -94,7 +113,55 @@ public static BrokeredMessage GetBrokeredMessageFromObject(
}
}

public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage message)
static BrokeredMessage GenerateBrokeredMessageWithCompressionTypeProperty(Stream stream, string compressionType)
{
BrokeredMessage brokeredMessage = new BrokeredMessage(stream, true);
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = compressionType;

return brokeredMessage;
}

static async Task<BrokeredMessage> GenerateBrokeredMessageWithBlobKeyPropertyAsync(
Stream stream,
IOrchestrationServiceBlobStore orchestrationServiceBlobStore,
OrchestrationInstance instance,
ServiceBusMessageSettings messageSettings,
DateTime messageFireTime,
string compressionType)
{
if (stream.Length > messageSettings.MessageMaxSizeInBytes)
{
throw new ArgumentException(
$"The serialized message size {stream.Length} is larger than the supported external storage blob size {messageSettings.MessageMaxSizeInBytes}.",
"stream");
}

if (orchestrationServiceBlobStore == null)
{
throw new ArgumentException(
"Please provide an implementation of IOrchestrationServiceBlobStore for external storage.",
"orchestrationServiceBlobStore");
}

// save the compressed stream using external storage when it is larger
// than the supported message size limit.
// the stream is stored using the generated key, which is saved in the message property.
string blobKey = orchestrationServiceBlobStore.BuildMessageBlobKey(instance, messageFireTime);

TraceHelper.TraceInstance(
TraceEventType.Information,
instance,
() => $"Saving the message stream in blob storage using key {blobKey}.");
await orchestrationServiceBlobStore.SaveStreamAsync(blobKey, stream);

BrokeredMessage brokeredMessage = new BrokeredMessage();
brokeredMessage.Properties[FrameworkConstants.MessageBlobKey] = blobKey;
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] = compressionType;

return brokeredMessage;
}

public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage message, IOrchestrationServiceBlobStore orchestrationServiceBlobStore)
{
if (message == null)
{
Expand All @@ -119,7 +186,7 @@ public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage
else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeGzipPropertyValue,
StringComparison.OrdinalIgnoreCase))
{
using (var compressedStream = message.GetBody<Stream>())
using (var compressedStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore))
{
if (!Utils.IsGzipStream(compressedStream))
{
Expand All @@ -137,7 +204,7 @@ public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage
else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeNonePropertyValue,
StringComparison.OrdinalIgnoreCase))
{
using (var rawStream = message.GetBody<Stream>())
using (var rawStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore))
{
deserializedObject = Utils.ReadObjectFromStream<T>(rawStream);
}
Expand All @@ -152,6 +219,33 @@ public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage
return deserializedObject;
}

static Task<Stream> LoadMessageStreamAsync(BrokeredMessage message, IOrchestrationServiceBlobStore orchestrationServiceBlobStore)
{
object blobKeyObj = null;
string blobKey = string.Empty;

if (message.Properties.TryGetValue(FrameworkConstants.MessageBlobKey, out blobKeyObj))
{
blobKey = (string)blobKeyObj;
}

if (string.IsNullOrEmpty(blobKey))
{
// load the stream from the message directly if the blob key property is not set,
// i.e., it is not stored externally
return Task.Run(() => message.GetBody<Stream>());
}

// if the blob key is set in the message property,
// load the stream message from the service bus message store.
if (orchestrationServiceBlobStore == null)
{
throw new ArgumentException($"Failed to load compressed message from external storage with key: {blobKey}. Please provide an implementation of IServiceBusMessageStore for external storage.", nameof(orchestrationServiceBlobStore));
}

return orchestrationServiceBlobStore.LoadStreamAsync(blobKey);
}

public static void CheckAndLogDeliveryCount(string sessionId, IEnumerable<BrokeredMessage> messages, int maxDeliverycount)
{
foreach (BrokeredMessage message in messages)
Expand Down
8 changes: 8 additions & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,22 @@
<ItemGroup>
<Compile Include="History\HistoryStateEvent.cs" />
<Compile Include="IOrchestrationServiceInstanceStore.cs" />
<Compile Include="OrchestrationSessionState.cs" />
<Compile Include="Serializing\RuntimeStateStreamConverter.cs" />
<Compile Include="Settings\JumpStartSettings.cs" />
<Compile Include="Settings\ServiceBusMessageSettings.cs" />
<Compile Include="Settings\ServiceBusOrchestrationServiceSettings.cs" />
<Compile Include="Settings\ServiceBusSessionSettings.cs" />
<Compile Include="Stats\Counter.cs" />
<Compile Include="Stats\ServiceBusOrchestrationServiceStats.cs" />
<Compile Include="Stats\ServiceBusQueueStats.cs" />
<Compile Include="Tracking\AzureStorageBlobStore.cs" />
<Compile Include="Tracking\AzureTableOrchestrationJumpStartEntity.cs" />
<Compile Include="TrackingWorkItem.cs" />
<Compile Include="Tracking\AzureTableInstanceStore.cs" />
<Compile Include="Tracking\BlobStorageClient.cs" />
<Compile Include="Tracking\BlobStorageClientHelper.cs" />
<Compile Include="Tracking\IOrchestrationServiceBlobStore.cs" />
<Compile Include="Tracking\JumpStartManager.cs" />
<Compile Include="Tracking\InstanceEntityBase.cs" />
<Compile Include="Tracking\OrchestrationJumpStartInstanceEntity.cs" />
Expand Down
14 changes: 13 additions & 1 deletion Framework/FrameworkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,19 @@ internal class FrameworkConstants
public const string CompressionTypeGzipPropertyValue = "gzip";
public const string CompressionTypeNonePropertyValue = "none";

// message blob key in message property
// this property is a key to the message blob when it exceeds the message limit
public const string MessageBlobKey = "MessageBlobKey";

// instance store constants
public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes
public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes

// default settings for message size
public const int MessageOverflowThresholdInBytesDefault = 170 * 1024;
public const int MessageMaxSizeInBytesDefault = 10 * 1024 * 1024;

// default settings for session size
public const int SessionOverflowThresholdInBytesDefault = 230 * 1024;
public const int SessionMaxSizeInBytesDefault = 10 * 1024 * 1024;
}
}
3 changes: 2 additions & 1 deletion Framework/IOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ Task<OrchestrationState> WaitForOrchestrationAsync(

/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationInstanceHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
}
}
61 changes: 61 additions & 0 deletions Framework/OrchestrationSessionState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask
{
using System.Collections.Generic;
using DurableTask.History;

/// <summary>
/// The object that represents the serialized session state.
/// It holds a list of history events (when blob key is empty),
/// or a key for external storage if the serialized stream is too large to fit into the the session state.
/// </summary>
internal class OrchestrationSessionState
{
/// <summary>
/// A constructor for deserialzation.
/// </summary>
public OrchestrationSessionState()
{
}

/// <summary>
/// Wrap a list of history events into an OrchestrationSessionState instance, which will be later serialized as a stream saved in session state.
/// </summary>
/// /// <param name="events">A list of history events.</param>
public OrchestrationSessionState(IList<HistoryEvent> events)
{
this.Events = events;
}

/// <summary>
/// Construct an OrchestrationSessionState instance with a blob key as the blob reference in the external blob storage.
/// </summary>
/// /// <param name="blobKey">The blob key to access the blob</param>
public OrchestrationSessionState(string blobKey)
{
this.BlobKey = blobKey;
}

/// <summary>
/// List of all history events for runtime state
/// </summary>
public IList<HistoryEvent> Events { get; set; }

/// <summary>
/// The blob key for external storage. Could be null or empty if not externally stored.
/// </summary>
public string BlobKey { get; set; }
}
}
Loading

0 comments on commit 6481e6a

Please sign in to comment.