Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher middleware for task orchestrations and task activities #95

Merged
merged 1 commit into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions Test/DurableTask.Core.Tests/DispatcherMiddlewareTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Tests
{
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DurableTask.Core.History;
using DurableTask.Emulator;
using DurableTask.Test.Orchestrations;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class DispatcherMiddlewareTests
{
TaskHubWorker worker;
TaskHubClient client;

[TestInitialize]
public async Task Initialize()
{
var service = new LocalOrchestrationService();
this.worker = new TaskHubWorker(service);

await this.worker
.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration))
.AddTaskActivities(typeof(SimplestGetUserTask), typeof(SimplestSendGreetingTask))
.StartAsync();

this.client = new TaskHubClient(service);
}

[TestCleanup]
public async Task TestCleanup()
{
await this.worker.StopAsync(true);
}

[TestMethod]
public async Task DispatchMiddlewareContextBuiltInProperties()
{
TaskOrchestration orchestration = null;
OrchestrationRuntimeState state = null;
OrchestrationInstance instance1 = null;

TaskActivity activity = null;
TaskScheduledEvent taskScheduledEvent = null;
OrchestrationInstance instance2 = null;

this.worker.AddOrchestrationDispatcherMiddleware((context, next) =>
{
orchestration = context.GetProperty<TaskOrchestration>();
state = context.GetProperty<OrchestrationRuntimeState>();
instance1 = context.GetProperty<OrchestrationInstance>();

return next();
});

this.worker.AddActivityDispatcherMiddleware((context, next) =>
{
activity = context.GetProperty<TaskActivity>();
taskScheduledEvent = context.GetProperty<TaskScheduledEvent>();
instance2 = context.GetProperty<OrchestrationInstance>();

return next();
});

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

Assert.IsNotNull(orchestration);
Assert.IsNotNull(state);
Assert.IsNotNull(instance1);

Assert.IsNotNull(activity);
Assert.IsNotNull(taskScheduledEvent);
Assert.IsNotNull(instance2);

Assert.AreNotSame(instance1, instance2);
Assert.AreEqual(instance1.InstanceId, instance2.InstanceId);
}

[TestMethod]
public async Task OrchestrationDispatcherMiddlewareContextFlow()
{
StringBuilder output = null;

for (int i = 0; i < 10; i++)
{
string value = i.ToString();
this.worker.AddOrchestrationDispatcherMiddleware(async (context, next) =>
{
output = context.GetProperty<StringBuilder>("output");
if (output == null)
{
output = new StringBuilder();
context.SetProperty("output", output);
}

output.Append(value);
await next();
output.Append(value);
});
}

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

// Each reply gets a new context, so the output should stay the same regardless of how
// many replays an orchestration goes through.
Assert.IsNotNull(output);
Assert.AreEqual("01234567899876543210", output.ToString());
}

[TestMethod]
public async Task ActivityDispatcherMiddlewareContextFlow()
{
StringBuilder output = null;

for (int i = 0; i < 10; i++)
{
string value = i.ToString();
this.worker.AddActivityDispatcherMiddleware(async (context, next) =>
{
output = context.GetProperty<StringBuilder>("output");
if (output == null)
{
output = new StringBuilder();
context.SetProperty("output", output);
}

output.Append(value);
await next();
output.Append(value);
});
}

var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null);

TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
await this.client.WaitForOrchestrationAsync(instance, timeout);

// Each actiivty gets a new context, so the output should stay the same regardless of how
// many activities an orchestration schedules (as long as there is at least one).
Assert.IsNotNull(output);
Assert.AreEqual("01234567899876543210", output.ToString());
}
}
}
75 changes: 75 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewareContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System;
using System.Collections.Generic;

/// <summary>
/// Context data that can be used to share data between middleware.
/// </summary>
public class DispatchMiddlewareContext
{
internal DispatchMiddlewareContext()
{
}

/// <summary>
/// Sets a property value to the context using the full name of the type as the key.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <param name="value">The value of the property.</param>
public void SetProperty<T>(T value)
{
this.SetProperty(typeof(T).FullName, value);
}

/// <summary>
/// Sets a named property value to the context.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <param name="key">The name of the property.</param>
/// <param name="value">The value of the property.</param>
public void SetProperty<T>(string key, T value)
{
this.Properties[key] = value;
}

/// <summary>
/// Gets a property value from the context using the full name of <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of the property.</typeparam>
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns>
public T GetProperty<T>()
{
return this.GetProperty<T>(typeof(T).FullName);
}

/// <summary>
/// Gets a named property value from the context.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key">The name of the property value.</param>
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns>
public T GetProperty<T>(string key)
{
return this.Properties.TryGetValue(key, out object value) ? (T)value : default(T);
}

/// <summary>
/// Gets a key/value collection that can be used to share data between middleware.
/// </summary>
public IDictionary<string, object> Properties { get; } = new Dictionary<string, object>(StringComparer.Ordinal);
}
}
24 changes: 24 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewareDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System.Threading.Tasks;

/// <summary>
/// A function that runs in the task execution middleware pipeline.
/// </summary>
/// <param name="context">The <see cref="DispatchMiddlewareContext"/> for the task execution.</param>
/// <returns>A task that represents the completion of the durable task execution.</returns>
public delegate Task DispatchMiddlewareDelegate(DispatchMiddlewareContext context);
}
49 changes: 49 additions & 0 deletions src/DurableTask.Core/Middleware/DispatchMiddlewarePipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Middleware
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

class DispatchMiddlewarePipeline
{
readonly IList<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>> components =
new List<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>>();

public Task RunAsync(DispatchMiddlewareContext context, DispatchMiddlewareDelegate handler)
{
// Build the delegate chain
foreach (var component in this.components.Reverse())
{
handler = component(handler);
}

return handler(context);
}

public void Add(Func<DispatchMiddlewareContext, Func<Task>, Task> middleware)
{
this.components.Add(next =>
{
return context =>
{
Func<Task> simpleNext = () => next(context);
return middleware(context, simpleNext);
};
});
}
}
}
Loading