Skip to content

Commit

Permalink
Dispatcher middleware for task orchestrations and task activities (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored and simonporter committed Jun 7, 2017
1 parent 6391a67 commit fffab58
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 52 deletions.
165 changes: 165 additions & 0 deletions Test/DurableTask.Core.Tests/DispatcherMiddlewareTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Tests
{
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DurableTask.Core.History;
using DurableTask.Emulator;
using DurableTask.Test.Orchestrations;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class DispatcherMiddlewareTests
{
TaskHubWorker worker;
TaskHubClient client;

[TestInitialize]
public async Task Initialize()
{
var service = new LocalOrchestrationService();
this.worker = new TaskHubWorker(service);

await this.worker
.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration))
.AddTaskActivities(typeof(SimplestGetUserTask), typeof(SimplestSendGreetingTask))
.StartAsync();

this.client = new TaskHubClient(service);
}

[TestCleanup]
public async Task TestCleanup()
{
await this.worker.StopAsync(true);
}

[TestMethod]
public async Task DispatchMiddlewareContextBuiltInProperties()
{
TaskOrchestration orchestration = null;
OrchestrationRuntimeState state = null;
OrchestrationInstance instance1 = null;

TaskActivity activity = null;
TaskScheduledEvent taskScheduledEvent = null;
OrchestrationInstance instance2 = null;

this.worker.AddOrchestrationDispatcherMiddleware((context, next) =>
{
orchestration = context.GetProperty<TaskOrchestration>();
state = context.GetProperty<OrchestrationRuntimeState>();
instance1 = context.GetProperty<OrchestrationInstance>();
return next();
});

this.worker.AddActivityDispatcherMiddleware((context, next) =>
{
activity = context.GetProperty<TaskActivity>();
taskScheduledEvent = context.GetProperty<TaskScheduledEvent>();
instance2 = context.GetProperty<OrchestrationInstance>();
return next();
});

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

Assert.IsNotNull(orchestration);
Assert.IsNotNull(state);
Assert.IsNotNull(instance1);

Assert.IsNotNull(activity);
Assert.IsNotNull(taskScheduledEvent);
Assert.IsNotNull(instance2);

Assert.AreNotSame(instance1, instance2);
Assert.AreEqual(instance1.InstanceId, instance2.InstanceId);
}

[TestMethod]
public async Task OrchestrationDispatcherMiddlewareContextFlow()
{
StringBuilder output = null;

for (int i = 0; i < 10; i++)
{
string value = i.ToString();
this.worker.AddOrchestrationDispatcherMiddleware(async (context, next) =>
{
output = context.GetProperty<StringBuilder>("output");
if (output == null)
{
output = new StringBuilder();
context.SetProperty("output", output);
}
output.Append(value);
await next();
output.Append(value);
});
}

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

// Each reply gets a new context, so the output should stay the same regardless of how
// many replays an orchestration goes through.
Assert.IsNotNull(output);
Assert.AreEqual("01234567899876543210", output.ToString());
}

[TestMethod]
public async Task ActivityDispatcherMiddlewareContextFlow()
{
StringBuilder output = null;

for (int i = 0; i < 10; i++)
{
string value = i.ToString();
this.worker.AddActivityDispatcherMiddleware(async (context, next) =>
{
output = context.GetProperty<StringBuilder>("output");
if (output == null)
{
output = new StringBuilder();
context.SetProperty("output", output);
}
output.Append(value);
await next();
output.Append(value);
});
}

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

// Each actiivty gets a new context, so the output should stay the same regardless of how
// many activities an orchestration schedules (as long as there is at least one).
Assert.IsNotNull(output);
Assert.AreEqual("01234567899876543210", output.ToString());
}
}
}
75 changes: 75 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewareContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System;
using System.Collections.Generic;

/// <summary>
/// Context data that can be used to share data between middleware.
/// </summary>
public class DispatchMiddlewareContext
{
internal DispatchMiddlewareContext()
{
}

/// <summary>
/// Sets a property value to the context using the full name of the type as the key.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <param name="value">The value of the property.</param>
public void SetProperty<T>(T value)
{
this.SetProperty(typeof(T).FullName, value);
}

/// <summary>
/// Sets a named property value to the context.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <param name="key">The name of the property.</param>
/// <param name="value">The value of the property.</param>
public void SetProperty<T>(string key, T value)
{
this.Properties[key] = value;
}

/// <summary>
/// Gets a property value from the context using the full name of <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns>
public T GetProperty<T>()
{
return this.GetProperty<T>(typeof(T).FullName);
}

/// <summary>
/// Gets a named property value from the context.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key">The name of the property value.</param>
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns>
public T GetProperty<T>(string key)
{
return this.Properties.TryGetValue(key, out object value) ? (T)value : default(T);
}

/// <summary>
/// Gets a key/value collection that can be used to share data between middleware.
/// </summary>
public IDictionary<string, object> Properties { get; } = new Dictionary<string, object>(StringComparer.Ordinal);
}
}
24 changes: 24 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewareDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System.Threading.Tasks;

/// <summary>
/// A function that runs in the task execution middleware pipeline.
/// </summary>
/// <param name="context">The <see cref="DispatchMiddlewareContext"/> for the task execution.</param>
/// <returns>A task that represents the completion of the durable task execution.</returns>
public delegate Task DispatchMiddlewareDelegate(DispatchMiddlewareContext context);
}
49 changes: 49 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewarePipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

class DispatchMiddlewarePipeline
{
readonly IList<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>> components =
new List<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>>();

public Task RunAsync(DispatchMiddlewareContext context, DispatchMiddlewareDelegate handler)
{
// Build the delegate chain
foreach (var component in this.components.Reverse())
{
handler = component(handler);
}

return handler(context);
}

public void Add(Func<DispatchMiddlewareContext, Func<Task>, Task> middleware)
{
this.components.Add(next =>
{
return context =>
{
Func<Task> simpleNext = () => next(context);
return middleware(context, simpleNext);
};
});
}
}
}
Loading

0 comments on commit fffab58

Please sign in to comment.