diff --git a/Minio.Functional.Tests/FunctionalTest.cs b/Minio.Functional.Tests/FunctionalTest.cs index 141a21b94..4bbca7808 100644 --- a/Minio.Functional.Tests/FunctionalTest.cs +++ b/Minio.Functional.Tests/FunctionalTest.cs @@ -17,14 +17,12 @@ using System; using System.Collections.Generic; -using System.ComponentModel; using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; using System.Net; using System.Net.Http; -using System.Reflection; using System.Runtime.InteropServices; using System.Security.Cryptography; using System.Text; @@ -273,6 +271,23 @@ public static string GetRandomName(int length = 5) return "minio-dotnet-example-" + result; } + internal static void generateRandomFile(string fileName) + { + using (var fs = new FileStream(fileName, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true)) + { + var fileSize = 3L * 1024 * 1024 * 1024; + var segments = fileSize / 10000; + var last_seg = fileSize % 10000; + var br = new BinaryWriter(fs); + + for (long i = 0; i < segments; i++) + br.Write(new byte[10000]); + + br.Write(new byte[last_seg]); + br.Close(); + } + } + // Return true if running in Mint mode public static bool IsMintEnv() { @@ -4023,36 +4038,6 @@ internal static async Task CopyObject_Test7(MinioClient minio) } } - public static void objPrint(object obj) - { - foreach (PropertyDescriptor descriptor in TypeDescriptor.GetProperties(obj)) - { - var name = descriptor.Name; - var value = descriptor.GetValue(obj); - Console.WriteLine("{0}={1}", name, value); - } - } - - public static void Print(object obj) - { - foreach (var prop in obj.GetType() - .GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)) - { - var value = prop.GetValue(obj, new object[] { }); - Console.WriteLine("{0} = {1}", prop.Name, value); - } - - Console.WriteLine("DONE!\n\n"); - } - - public static void printDict(Dictionary d) - { - if (d != null) - foreach (var kv in d) - Console.WriteLine(" {0} = {1}", kv.Key, kv.Value); - Console.WriteLine("DONE!\n\n"); - } - internal static async Task CopyObject_Test8(MinioClient minio) { var startTime = DateTime.Now; @@ -4788,6 +4773,83 @@ internal static async Task GetObject_3_OffsetLength_Tests(MinioClient minio) } } + internal static async Task GetObject_AsyncCallback_Test1(MinioClient minio) + { + var startTime = DateTime.Now; + var bucketName = GetRandomName(15); + var objectName = GetRandomObjectName(10); + string contentType = null; + var fileName = GetRandomName(10); + var destFileName = GetRandomName(10); + var args = new Dictionary + { + { "bucketName", bucketName }, + { "objectName", objectName }, + { "contentType", contentType } + }; + + try + { + // Create a large local file + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) generateRandomFile(fileName); + else Bash("truncate -s 2G " + fileName); + + // Create the bucket + await Setup_Test(minio, bucketName); + + using (var filestream = new FileStream(File.OpenHandle(fileName), FileAccess.Read)) + { + // Upload the large file, "fileName", into the bucket + var size = filestream.Length; + long file_read_size = 0; + var putObjectArgs = new PutObjectArgs() + .WithBucket(bucketName) + .WithObject(objectName) + .WithStreamData(filestream) + .WithObjectSize(filestream.Length) + .WithContentType(contentType); + + await minio.PutObjectAsync(putObjectArgs).ConfigureAwait(false); + + var callbackAsync = async delegate(Stream stream, CancellationToken cancellationToken) + { + using (var dest = new FileStream(destFileName, FileMode.Create, FileAccess.Write)) + { + await stream.CopyToAsync(dest); + } + }; + + var getObjectArgs = new GetObjectArgs() + .WithBucket(bucketName) + .WithObject(objectName) + .WithCallbackStream(async (stream, cancellationToken) => await callbackAsync(stream, default)); + + await minio.GetObjectAsync(getObjectArgs).ConfigureAwait(false); + var writtenInfo = new FileInfo(destFileName); + file_read_size = writtenInfo.Length; + Assert.AreEqual(size, file_read_size); + + new MintLogger("GetObject_LargeFile_Test0", getObjectSignature, + "Tests whether GetObject as stream works", + TestStatus.PASS, DateTime.Now - startTime, args: args).Log(); + } + } + catch (Exception ex) + { + new MintLogger("GetObject_LargeFile_Test0", getObjectSignature, "Tests whether GetObject as stream works", + TestStatus.FAIL, DateTime.Now - startTime, ex.Message, ex.ToString(), args: args).Log(); + throw; + } + finally + { + if (File.Exists(fileName)) + File.Delete(fileName); + if (File.Exists(destFileName)) + File.Delete(destFileName); + await TearDown(minio, bucketName); + } + } + internal static async Task FGetObject_Test1(MinioClient minio) { var startTime = DateTime.Now; diff --git a/Minio.Functional.Tests/Program.cs b/Minio.Functional.Tests/Program.cs index 39cd9288a..c9d045130 100644 --- a/Minio.Functional.Tests/Program.cs +++ b/Minio.Functional.Tests/Program.cs @@ -141,6 +141,8 @@ public static void Main(string[] args) // and length parameters. Tests will be reported as GetObject_Test3, // GetObject_Test4 and GetObject_Test5. FunctionalTest.GetObject_3_OffsetLength_Tests(minioClient).Wait(); + // Test async callback function to download an object + FunctionalTest.GetObject_AsyncCallback_Test1(minioClient).Wait(); // Test File GetObject and PutObject functions FunctionalTest.FGetObject_Test1(minioClient).Wait(); diff --git a/Minio/DataModel/ObjectOperationsArgs.cs b/Minio/DataModel/ObjectOperationsArgs.cs index 8908e69e1..e21ce8eba 100644 --- a/Minio/DataModel/ObjectOperationsArgs.cs +++ b/Minio/DataModel/ObjectOperationsArgs.cs @@ -22,6 +22,8 @@ using System.Net.Http; using System.Security.Cryptography; using System.Text; +using System.Threading; +using System.Threading.Tasks; using System.Xml; using System.Xml.Linq; using Minio.DataModel; @@ -509,6 +511,7 @@ public GetObjectArgs() } internal Action CallBack { get; private set; } + internal Func FuncCallBack { get; private set; } internal long ObjectOffset { get; private set; } internal long ObjectLength { get; private set; } internal string FileName { get; private set; } @@ -517,7 +520,7 @@ public GetObjectArgs() internal override void Validate() { base.Validate(); - if (CallBack == null && string.IsNullOrEmpty(FileName)) + if (CallBack == null && FuncCallBack == null && string.IsNullOrEmpty(FileName)) throw new MinioException("Atleast one of " + nameof(CallBack) + ", CallBack method or " + nameof(FileName) + " file path to save need to be set for GetObject operation."); if (OffsetLengthSet) @@ -551,7 +554,10 @@ private void Populate() internal override HttpRequestMessageBuilder BuildRequest(HttpRequestMessageBuilder requestMessageBuilder) { if (!string.IsNullOrEmpty(VersionId)) requestMessageBuilder.AddQueryParameter("versionId", $"{VersionId}"); - requestMessageBuilder.ResponseWriter = CallBack; + + if (CallBack is not null) requestMessageBuilder.ResponseWriter = CallBack; + else requestMessageBuilder.FunctionResponseWriter = FuncCallBack; + if (Headers.ContainsKey(S3ZipExtractKey)) requestMessageBuilder.AddQueryParameter(S3ZipExtractKey, Headers[S3ZipExtractKey]); @@ -565,6 +571,12 @@ public GetObjectArgs WithCallbackStream(Action cb) return this; } + public GetObjectArgs WithCallbackStream(Func cb) + { + FuncCallBack = cb; + return this; + } + public GetObjectArgs WithOffsetAndLength(long offset, long length) { OffsetLengthSet = true; diff --git a/Minio/Helper/OperationsHelper.cs b/Minio/Helper/OperationsHelper.cs index 0201f7cd5..ca8d40b2c 100644 --- a/Minio/Helper/OperationsHelper.cs +++ b/Minio/Helper/OperationsHelper.cs @@ -53,8 +53,9 @@ private async Task getObjectHelper(GetObjectArgs args, CancellationT args.Validate(); if (args.FileName != null) await getObjectFileAsync(args, objStat, cancellationToken); - else + else if (args.CallBack is not null) await getObjectStreamAsync(args, objStat, args.CallBack, cancellationToken); + else await getObjectStreamAsync(args, objStat, args.FuncCallBack, cancellationToken); return objStat; } @@ -70,7 +71,6 @@ private Task getObjectFileAsync(GetObjectArgs args, ObjectStat objectStat, var length = objectStat.Size; var etag = objectStat.ETag; - long tempFileSize = 0; var tempFileName = $"{args.FileName}.{etag}.part.minio"; if (!string.IsNullOrEmpty(args.VersionId)) tempFileName = $"{args.FileName}.{etag}.{args.VersionId}.part.minio"; if (File.Exists(args.FileName)) File.Delete(args.FileName); @@ -78,18 +78,19 @@ private Task getObjectFileAsync(GetObjectArgs args, ObjectStat objectStat, utils.ValidateFile(tempFileName); if (File.Exists(tempFileName)) File.Delete(tempFileName); - args = args.WithCallbackStream(stream => + var callbackAsync = async delegate(Stream stream, CancellationToken cancellationToken) { - var fileStream = File.Create(tempFileName); - stream.CopyTo(fileStream); - fileStream.Dispose(); - var writtenInfo = new FileInfo(tempFileName); - var writtenSize = writtenInfo.Length; - if (writtenSize != length - tempFileSize) - throw new IOException(tempFileName + - ": Unexpected data written. Expected = " - + (length - tempFileSize) - + ", written = " + writtenSize); + using (var dest = new FileStream(tempFileName, FileMode.Create, FileAccess.Write)) + { + await stream.CopyToAsync(dest); + } + }; + + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(15)); + args.WithCallbackStream(async (stream, cancellationToken) => + { + await callbackAsync(stream, cts.Token); utils.MoveWithReplace(tempFileName, args.FileName); }); return getObjectStreamAsync(args, objectStat, null, cancellationToken); @@ -114,6 +115,29 @@ private async Task getObjectStreamAsync(GetObjectArgs args, ObjectStat objectSta .ConfigureAwait(false); } + /// + /// private helper method. It returns the specified portion or full object from the bucket + /// + /// GetObjectArgs Arguments Object encapsulates information like - bucket name, object name etc + /// + /// ObjectStat object encapsulates information like - object name, size, etag etc, represents + /// Object Information + /// + /// + /// Callback function to send/process Object contents using + /// async Func object which takes Stream and CancellationToken as input + /// and Task as output, if assigned + /// + /// Optional cancellation token to cancel the operation + private async Task getObjectStreamAsync(GetObjectArgs args, ObjectStat objectStat, + Func cb, + CancellationToken cancellationToken = default) + { + var requestMessageBuilder = await CreateRequest(args).ConfigureAwait(false); + using var response = await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken) + .ConfigureAwait(false); + } + /// /// private helper method to remove list of objects from bucket ///