diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f41d31..cca2f38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +**2024-08-23** + +v8.5.1 + +修复:对象存储,分片并发上传实现非预期 + **2023-12-11** v8.5.0 diff --git a/src/Qiniu/Qiniu.csproj b/src/Qiniu/Qiniu.csproj index 17016b6..a166e1a 100644 --- a/src/Qiniu/Qiniu.csproj +++ b/src/Qiniu/Qiniu.csproj @@ -28,7 +28,7 @@ Qiniu - 8.5.0 + 8.5.1 Rong Zhou, Qiniu SDK Shanghai Qiniu Information Technology Co., Ltd. Qiniu Resource (Cloud) Storage SDK for C# diff --git a/src/Qiniu/QiniuCSharpSDK.cs b/src/Qiniu/QiniuCSharpSDK.cs index 23c8e7b..65e023a 100644 --- a/src/Qiniu/QiniuCSharpSDK.cs +++ b/src/Qiniu/QiniuCSharpSDK.cs @@ -37,6 +37,6 @@ public class QiniuCSharpSDK /// /// SDK版本号 /// - public const string VERSION = "8.5.0"; + public const string VERSION = "8.5.1"; } diff --git a/src/Qiniu/Storage/ResumableUploader.cs b/src/Qiniu/Storage/ResumableUploader.cs index 2448be3..1812b6c 100644 --- a/src/Qiniu/Storage/ResumableUploader.cs +++ b/src/Qiniu/Storage/ResumableUploader.cs @@ -165,16 +165,17 @@ private HttpResult UploadStreamV1(Stream stream, string key, string upToken, Put }; } - //init block upload error + // init block upload error UploadControllerAction upCtrl = putExtra.UploadController(); - ManualResetEvent manualResetEvent = new ManualResetEvent(false); - Dictionary blockDataDict = new Dictionary(); + object progressLock = new object(); + ManualResetEvent upCtrlManualResetEvent = new ManualResetEvent(false); + List runningTaskEvents = new List(); Dictionary blockMakeResults = new Dictionary(); Dictionary uploadedBytesDict = new Dictionary(); uploadedBytesDict.Add("UploadProgress", uploadedBytes); byte[] blockBuffer = new byte[putExtra.PartSize]; - //check not finished blocks to upload + // check not finished blocks to upload for (long blockIndex = 0; blockIndex < blockCount; blockIndex++) { string context = resumeInfo.Contexts[blockIndex]; @@ -195,7 +196,7 @@ private HttpResult UploadStreamV1(Stream stream, string key, string upToken, Put result.RefCode = (int)HttpCode.USER_CANCELED; result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is aborted\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); - manualResetEvent.Set(); + upCtrlManualResetEvent.Set(); return result; } else if (upCtrl == UploadControllerAction.Suspended) @@ -203,7 +204,7 @@ private HttpResult UploadStreamV1(Stream stream, string key, string upToken, Put result.RefCode = (int)HttpCode.USER_PAUSED; result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is paused\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); - manualResetEvent.WaitOne(1000); + upCtrlManualResetEvent.WaitOne(1000); } else if (upCtrl == UploadControllerAction.Activated) { @@ -211,60 +212,61 @@ private HttpResult UploadStreamV1(Stream stream, string key, string upToken, Put } } + // upload blocks concurrently + // wait any task done + while (runningTaskEvents.Count >= putExtra.BlockUploadThreads) + { + int doneIndex = WaitHandle.WaitAny(runningTaskEvents.ToArray()); + runningTaskEvents.RemoveAt(doneIndex); + } + // check mkblk results + foreach (HttpResult mkblkRet in blockMakeResults.Values) + { + if (mkblkRet.Code != 200) + { + result = mkblkRet; + upCtrlManualResetEvent.Set(); + return result; + } + } + blockMakeResults.Clear(); + // add task long offset = blockIndex * putExtra.PartSize; stream.Seek(offset, SeekOrigin.Begin); int blockLen = stream.Read(blockBuffer, 0, putExtra.PartSize); byte[] blockData = new byte[blockLen]; Array.Copy(blockBuffer, blockData, blockLen); - blockDataDict.Add(blockIndex, blockData); - if (blockDataDict.Count == putExtra.BlockUploadThreads) - { - - processMakeBlocks(blockDataDict, upToken, putExtra, resumeInfo, blockMakeResults, uploadedBytesDict, fileSize, - encodedObjectName); - //check mkblk results - foreach (int blkIndex in blockMakeResults.Keys) - { - HttpResult mkblkRet = blockMakeResults[blkIndex]; - if (mkblkRet.Code != 200) - { - result = mkblkRet; - manualResetEvent.Set(); - return result; - } - } - blockDataDict.Clear(); - blockMakeResults.Clear(); - if (!string.IsNullOrEmpty(putExtra.ResumeRecordFile)) - { - ResumeHelper.Save(resumeInfo, putExtra.ResumeRecordFile); - } - } + ManualResetEvent makeBlockEvent = createMakeBlockTask( + blockIndex, + blockData, + upToken, + putExtra, + resumeInfo, + blockMakeResults, + uploadedBytesDict, + fileSize, + encodedObjectName, + progressLock + ); + runningTaskEvents.Add(makeBlockEvent); } - if (blockDataDict.Count > 0) + if (runningTaskEvents.Count > 0) { - processMakeBlocks(blockDataDict, upToken, putExtra, resumeInfo, blockMakeResults, uploadedBytesDict, fileSize, - encodedObjectName); + WaitHandle.WaitAll(runningTaskEvents.ToArray()); //check mkblk results - foreach (int blkIndex in blockMakeResults.Keys) + foreach (HttpResult mkblkRet in blockMakeResults.Values) { - HttpResult mkblkRet = blockMakeResults[blkIndex]; if (mkblkRet.Code != 200) { result = mkblkRet; - manualResetEvent.Set(); + upCtrlManualResetEvent.Set(); return result; } } - blockDataDict.Clear(); blockMakeResults.Clear(); - if (!string.IsNullOrEmpty(putExtra.ResumeRecordFile)) - { - ResumeHelper.Save(resumeInfo, putExtra.ResumeRecordFile); - } } if (upCtrl == UploadControllerAction.Activated) @@ -295,7 +297,7 @@ private HttpResult UploadStreamV1(Stream stream, string key, string upToken, Put DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); } - manualResetEvent.Set(); + upCtrlManualResetEvent.Set(); } catch (Exception ex) { @@ -359,7 +361,7 @@ private HttpResult UploadStreamV2(Stream stream, string key, string upToken, Put }; } - //calc upload progress + // calc upload progress for (long blockIndex = 0; blockIndex < blockCount; blockIndex++) { Dictionary etag = resumeInfo.Etags[blockIndex]; @@ -369,111 +371,72 @@ private HttpResult UploadStreamV2(Stream stream, string key, string upToken, Put resumeInfo.Uploaded = uploadedBytes; } } - //set upload progress + // set upload progress putExtra.ProgressHandler(uploadedBytes, fileSize); - //init block upload error - //check not finished blocks to upload + // init block upload error + // check not finished blocks to upload UploadControllerAction upCtrl = putExtra.UploadController(); - ManualResetEvent manualResetEvent = new ManualResetEvent(false); - Dictionary blockDataDict = new Dictionary(); + object progressLock = new object(); + ManualResetEvent upCtrlManualResetEvent = new ManualResetEvent(false); + List runningTaskEvents = new List(); Dictionary blockMakeResults = new Dictionary(); Dictionary uploadedBytesDict = new Dictionary(); uploadedBytesDict.Add("UploadProgress", uploadedBytes); byte[] blockBuffer = new byte[putExtra.PartSize]; + + // check not finished blocks to upload for (long blockIndex = 0; blockIndex < blockCount; blockIndex++) { - string context = null; + bool isBlockExists = false; Dictionary etag = resumeInfo.Etags[blockIndex]; if (etag != null && etag.Count > 0) { - context = "~"; + isBlockExists = true; + } + + if (isBlockExists) { + continue; } - if (string.IsNullOrEmpty(context)) + // check upload controller action before each chunk + while (true) { - //check upload controller action before each chunk - while (true) - { - upCtrl = putExtra.UploadController(); + upCtrl = putExtra.UploadController(); - if (upCtrl == UploadControllerAction.Aborted) - { - result.Code = (int)HttpCode.USER_CANCELED; - result.RefCode = (int)HttpCode.USER_CANCELED; - result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is aborted\n", - DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); - manualResetEvent.Set(); - return result; - } - else if (upCtrl == UploadControllerAction.Suspended) - { - result.RefCode = (int)HttpCode.USER_PAUSED; - result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is paused\n", - DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); - manualResetEvent.WaitOne(1000); - } - else if (upCtrl == UploadControllerAction.Activated) - { - break; - } + if (upCtrl == UploadControllerAction.Aborted) + { + result.Code = (int)HttpCode.USER_CANCELED; + result.RefCode = (int)HttpCode.USER_CANCELED; + result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is aborted\n", + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); + upCtrlManualResetEvent.Set(); + return result; } - - long offset = blockIndex * putExtra.PartSize; - stream.Seek(offset, SeekOrigin.Begin); - int blockLen = stream.Read(blockBuffer, 0, putExtra.PartSize); - byte[] blockData = new byte[blockLen]; - Array.Copy(blockBuffer, blockData, blockLen); - blockDataDict.Add(blockIndex, blockData); - - if (blockDataDict.Count == putExtra.BlockUploadThreads) + else if (upCtrl == UploadControllerAction.Suspended) { - - processMakeBlocks(blockDataDict, upToken, putExtra, resumeInfo, blockMakeResults, uploadedBytesDict, fileSize, - encodedObjectName); - //check mkblk results - foreach (int blkIndex in blockMakeResults.Keys) - { - HttpResult mkblkRet = blockMakeResults[blkIndex]; - if (mkblkRet.Code == (int) HttpCode.FILE_NOT_EXIST) - { - if (File.Exists(putExtra.ResumeRecordFile)) - { - File.Delete(putExtra.ResumeRecordFile); - } - } - if (isResumeUpload && mkblkRet.Code == (int)HttpCode.FILE_NOT_EXIST) - { - stream.Seek(0, SeekOrigin.Begin); - return UploadStreamV2(stream, key, upToken, putExtra, null); - } - if (mkblkRet.Code != (int)HttpCode.OK) - { - result = mkblkRet; - manualResetEvent.Set(); - return result; - } - } - blockDataDict.Clear(); - blockMakeResults.Clear(); - if (!string.IsNullOrEmpty(putExtra.ResumeRecordFile)) - { - ResumeHelper.Save(resumeInfo, putExtra.ResumeRecordFile); - } + result.RefCode = (int)HttpCode.USER_PAUSED; + result.RefText += string.Format("[{0}] [ResumableUpload] Info: upload task is paused\n", + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); + upCtrlManualResetEvent.WaitOne(1000); + } + else if (upCtrl == UploadControllerAction.Activated) + { + break; } } - } - if (blockDataDict.Count > 0) - { - processMakeBlocks(blockDataDict, upToken, putExtra, resumeInfo, blockMakeResults, uploadedBytesDict, fileSize, - encodedObjectName); - //check mkblk results - foreach (int blkIndex in blockMakeResults.Keys) + // upload blocks concurrently + // wait any task done + while (runningTaskEvents.Count >= putExtra.BlockUploadThreads) + { + int doneIndex = WaitHandle.WaitAny(runningTaskEvents.ToArray()); + runningTaskEvents.RemoveAt(doneIndex); + } + // check mkblk results + foreach (HttpResult mkblkRet in blockMakeResults.Values) { - HttpResult mkblkRet = blockMakeResults[blkIndex]; - if (mkblkRet.Code == (int) HttpCode.FILE_NOT_EXIST) { if (File.Exists(putExtra.ResumeRecordFile)) @@ -489,17 +452,57 @@ private HttpResult UploadStreamV2(Stream stream, string key, string upToken, Put if (mkblkRet.Code != (int)HttpCode.OK) { result = mkblkRet; - manualResetEvent.Set(); + upCtrlManualResetEvent.Set(); return result; } } - blockDataDict.Clear(); blockMakeResults.Clear(); - if (!string.IsNullOrEmpty(putExtra.ResumeRecordFile)) + // add task + long offset = blockIndex * putExtra.PartSize; + stream.Seek(offset, SeekOrigin.Begin); + int blockLen = stream.Read(blockBuffer, 0, putExtra.PartSize); + byte[] blockData = new byte[blockLen]; + Array.Copy(blockBuffer, blockData, blockLen); + + ManualResetEvent makeBlockEvent = createMakeBlockTask( + blockIndex, + blockData, + upToken, + putExtra, + resumeInfo, + blockMakeResults, + uploadedBytesDict, + fileSize, + encodedObjectName, + progressLock + ); + runningTaskEvents.Add(makeBlockEvent); + } + + WaitHandle.WaitAll(runningTaskEvents.ToArray()); + //check mkblk results + foreach (HttpResult mkblkRet in blockMakeResults.Values) + { + if (mkblkRet.Code == (int) HttpCode.FILE_NOT_EXIST) { - ResumeHelper.Save(resumeInfo, putExtra.ResumeRecordFile); + if (File.Exists(putExtra.ResumeRecordFile)) + { + File.Delete(putExtra.ResumeRecordFile); + } + } + if (isResumeUpload && mkblkRet.Code == (int)HttpCode.FILE_NOT_EXIST) + { + stream.Seek(0, SeekOrigin.Begin); + return UploadStreamV2(stream, key, upToken, putExtra, null); + } + if (mkblkRet.Code != (int)HttpCode.OK) + { + result = mkblkRet; + upCtrlManualResetEvent.Set(); + return result; } } + blockMakeResults.Clear(); if (upCtrl == UploadControllerAction.Activated) { @@ -542,7 +545,7 @@ private HttpResult UploadStreamV2(Stream stream, string key, string upToken, Put DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); } - manualResetEvent.Set(); + upCtrlManualResetEvent.Set(); } catch (Exception ex) { @@ -570,37 +573,35 @@ private HttpResult UploadStreamV2(Stream stream, string key, string upToken, Put return result; } - private void processMakeBlocks(Dictionary blockDataDict, string upToken, - PutExtra putExtra, ResumeInfo resumeInfo, Dictionary blockMakeResults, - Dictionary uploadedBytesDict, long fileSize, string encodedObjectName) + private ManualResetEvent createMakeBlockTask( + long blockIndex, + byte[] blockData, + string upToken, + PutExtra putExtra, + ResumeInfo resumeInfo, + Dictionary blockMakeResults, + Dictionary uploadedBytesDict, // total uploaded size + long fileSize, + string encodedObjectName, + object progressLock + ) { - int taskMax = blockDataDict.Count; - ManualResetEvent[] doneEvents = new ManualResetEvent[taskMax]; - int eventIndex = 0; - object progressLock = new object(); - foreach (long blockIndex in blockDataDict.Keys) - { - //signal task - ManualResetEvent doneEvent = new ManualResetEvent(false); - doneEvents[eventIndex] = doneEvent; - eventIndex += 1; - - //queue task - byte[] blockData = blockDataDict[blockIndex]; - ResumeBlocker resumeBlocker = new ResumeBlocker(doneEvent, blockData, blockIndex, upToken, putExtra, - resumeInfo, blockMakeResults, progressLock, uploadedBytesDict, fileSize, encodedObjectName); - ThreadPool.QueueUserWorkItem(new WaitCallback(this.MakeBlockWithRetry), resumeBlocker); - } - - try - { - WaitHandle.WaitAll(doneEvents); - } - catch (Exception ex) - { - Console.WriteLine("wait all exceptions:" + ex.StackTrace); - //pass - } + ManualResetEvent doneEvent = new ManualResetEvent(false); + ResumeBlocker resumeBlocker = new ResumeBlocker( + doneEvent, + blockData, + blockIndex, + upToken, + putExtra, + resumeInfo, + blockMakeResults, + progressLock, + uploadedBytesDict, + fileSize, + encodedObjectName + ); + ThreadPool.QueueUserWorkItem(MakeBlockWithRetry, resumeBlocker); + return doneEvent; } private void MakeBlockWithRetry(object resumeBlockerObj) @@ -610,6 +611,7 @@ private void MakeBlockWithRetry(object resumeBlockerObj) Dictionary blockMakeResults = resumeBlocker.BlockMakeResults; long blockIndex = resumeBlocker.BlockIndex; PutExtra putExtra = resumeBlocker.PutExtra; + ResumeInfo resumeInfo = resumeBlocker.ResumeInfo; HttpResult result = MakeBlock(resumeBlockerObj); @@ -624,6 +626,11 @@ private void MakeBlockWithRetry(object resumeBlockerObj) retryTimes += 1; } + if (!string.IsNullOrEmpty(putExtra.ResumeRecordFile)) + { + ResumeHelper.Save(resumeInfo, putExtra.ResumeRecordFile); + } + //return the http result blockMakeResults.Add(blockIndex, result); doneEvent.Set(); diff --git a/src/QiniuTests/Http/Middleware.cs b/src/QiniuTests/Http/Middleware.cs index c2b45df..4fe8acf 100644 --- a/src/QiniuTests/Http/Middleware.cs +++ b/src/QiniuTests/Http/Middleware.cs @@ -41,7 +41,7 @@ public void SendWithMiddlewareTest() new RecorderMiddleware(orderRecorder, "B") }; - HttpResult resp = httpManager.Get("https://qiniu.com/index.html", null, null, middlewares); + HttpResult resp = httpManager.Get("https://example.com/index.html", null, null, middlewares); Assert.AreEqual((int)HttpCode.OK, resp.Code, resp.ToString()); CollectionAssert.AreEqual( @@ -70,7 +70,7 @@ public void RetryDomainsMiddlewareTest() new List { "unavailable.csharpsdk.qiniu.com", - "qiniu.com" + "example.com" }, 3 ), diff --git a/src/QiniuTests/Storage/BucketManagerTests.cs b/src/QiniuTests/Storage/BucketManagerTests.cs index a22d596..f31bee3 100644 --- a/src/QiniuTests/Storage/BucketManagerTests.cs +++ b/src/QiniuTests/Storage/BucketManagerTests.cs @@ -139,7 +139,7 @@ public void ChangeTypeTest() string key = "qiniu.png"; string newKey = "qiniu-to-change-type.png"; - HttpResult copyRet = bucketManager.Copy(Bucket, "qiniu.png", Bucket, newKey, true); + HttpResult copyRet = bucketManager.Copy(Bucket, key, Bucket, newKey, true); if (copyRet.Code != (int)HttpCode.OK) { Assert.Fail("copy error: " + copyRet.ToString()); diff --git a/src/QiniuTests/Storage/ResumableUploaderTests.cs b/src/QiniuTests/Storage/ResumableUploaderTests.cs index 2981ddb..0a3029e 100644 --- a/src/QiniuTests/Storage/ResumableUploaderTests.cs +++ b/src/QiniuTests/Storage/ResumableUploaderTests.cs @@ -50,6 +50,8 @@ public void UploadFileTest() putExtra.Params["x:var_1"] = "val_1"; putExtra.Params["x:var_2"] = "val_2"; + putExtra.BlockUploadThreads = 2; + ResumableUploader target = new ResumableUploader(config); HttpResult result = target.UploadFile(filePath, key, token, putExtra); Console.WriteLine("chunk upload result: " + result.ToString()); @@ -115,6 +117,7 @@ public void UploadFileV2Test() extra.Params = new Dictionary(); extra.Params["x:var_1"] = "val_1"; extra.Params["x:var_2"] = "val_2"; + extra.BlockUploadThreads = 2; ResumableUploader target = new ResumableUploader(config); HttpResult result = target.UploadFile(filePath, key, token, extra); Console.WriteLine("chunk upload result: " + result.ToString()); @@ -383,7 +386,7 @@ public void ResumeUploadFileV2WithoutKeyTest() } } - File.Delete(filePath); + File.Delete(filePath); } }