Skip to content

Commit

Permalink
add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
trueai-org committed Dec 26, 2023
1 parent d6b1aa0 commit 76d2461
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 27 deletions.
6 changes: 3 additions & 3 deletions src/MDriveSync.Client.API/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"http": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "swagger",
"launchUrl": "",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
Expand All @@ -13,15 +13,15 @@
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "swagger",
"launchUrl": "",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Docker": {
"commandName": "Docker",
"launchBrowser": true,
"launchUrl": "{Scheme}://{ServiceHost}:{ServicePort}/swagger",
"launchUrl": "{Scheme}://{ServiceHost}:{ServicePort}",
"environmentVariables": {
"ASPNETCORE_HTTP_PORTS": "8080"
},
Expand Down
77 changes: 77 additions & 0 deletions src/MDriveSync.Core/Services/AsyncLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Collections.Concurrent;

namespace MDriveSync.Core.Services
{
/// <summary>
/// 异步简单锁
/// </summary>
public class AsyncLock
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Task<IDisposable> _releaser;

public AsyncLock()
{
_releaser = Task.FromResult((IDisposable)new Releaser(this));
}

public async Task<IDisposable> LockAsync()
{
await _semaphore.WaitAsync();
return _releaser.Result;
}

private class Releaser : IDisposable
{
private readonly AsyncLock _toRelease;

internal Releaser(AsyncLock toRelease)
{
_toRelease = toRelease;
}

public void Dispose()
{
_toRelease._semaphore.Release();
}
}
}

/// <summary>
/// 异步资源锁
/// </summary>
public class AsyncLockV2
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreDictionary = new();

public async Task<IDisposable> LockAsync(string resource = "")
{
var semaphore = _semaphoreDictionary.GetOrAdd(resource, k => new SemaphoreSlim(1, 1));
await semaphore.WaitAsync();
return new Releaser(() => Release(resource));
}

private void Release(string resource)
{
if (_semaphoreDictionary.TryGetValue(resource, out var semaphore))
{
semaphore.Release();
}
}

private class Releaser : IDisposable
{
private readonly Action _releaseAction;

internal Releaser(Action releaseAction)
{
_releaseAction = releaseAction;
}

public void Dispose()
{
_releaseAction();
}
}
}
}
112 changes: 88 additions & 24 deletions src/MDriveSync.Core/Services/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace MDriveSync.Core
///
/// TODO
/// 文件监听变换时,移除 sha1 缓存,下次同步时重新计算
/// 同步文件夹,目前文件夹没有同步
///
///
/// TODO:
/// OK: 令牌自动刷新
/// job 管理
Expand Down Expand Up @@ -56,11 +55,27 @@ namespace MDriveSync.Core
/// </summary>
public class Job : IDisposable
{
private readonly ILogger _log;
/// <summary>
/// 本地文件锁
/// </summary>
private readonly object _localLock = new();

/// <summary>
/// 例行检查锁
/// </summary>
private readonly SemaphoreSlim _maintenanceLock = new(1, 1);

/// <summary>
/// 异步锁
/// </summary>
private AsyncLockV2 _lock = new();

private readonly object _lock = new();
///// <summary>
///// 云盘文件夹创建锁
///// </summary>
//private readonly AsyncLock _createFolderLock = new();

private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
private readonly ILogger _log;

private readonly IMemoryCache _cache;

Expand Down Expand Up @@ -209,13 +224,13 @@ public Job(AliyunDriveConfig driveConfig, JobConfig jobConfig, ILogger log)
/// <returns></returns>
public async Task Maintenance()
{
if (_semaphoreSlim.CurrentCount == 0)
if (_maintenanceLock.CurrentCount == 0)
{
// 如果检查执行中,则跳过,保证执行中的只有 1 个
return;
}

await _semaphoreSlim.WaitAsync();
await _maintenanceLock.WaitAsync();

try
{
Expand Down Expand Up @@ -310,7 +325,7 @@ public async Task Maintenance()
{
GC.Collect();

_semaphoreSlim.Release();
_maintenanceLock.Release();
}
}

Expand Down Expand Up @@ -472,7 +487,7 @@ private void PersistentDoWork(object state)
if (!_isLoadLocalFiles)
return;

lock (_lock)
lock (_localLock)
{
var dir = Path.GetDirectoryName(_localFileCacheName);
if (!Directory.Exists(dir))
Expand Down Expand Up @@ -660,7 +675,55 @@ private async Task SyncFiles()
var options = new ParallelOptions() { MaxDegreeOfParallelism = processorCount };

var process = 0;
var total = _localFiles.Count;
var total = _localFolders.Count;

// 并行创建文件夹
await Parallel.ForEachAsync(_localFolders, options, async (item, cancellationToken) =>
{
try
{
// 计算存储目录
var saveParentPath = $"{_driveSavePath}/{item.Key}".TrimPath();

// 存储目录 ID
var saveParentFileId = "root";

// 判断云盘是否存在路径,不存在则创建
if (!string.IsNullOrWhiteSpace(saveParentPath))
{
if (!_driveFolders.ContainsKey(saveParentPath))
{
var savePaths = saveParentPath.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var savePathsParentFileId = "root";
foreach (var subPath in savePaths)
{
savePathsParentFileId = await AliyunDriveCreateFolder(subPath, savePathsParentFileId);
}
}

if (!_driveFolders.ContainsKey(saveParentPath))
{
throw new Exception("文件夹创建失败");
}

saveParentFileId = _driveFolders[saveParentPath].FileId;
}
}
catch (Exception ex)
{
_log.LogError(ex, $"文件上传处理异常 {item.Value.FullPath}");
}
finally
{
Interlocked.Increment(ref process);
_log.LogInformation($"同步文件夹中 {process}/{total},用时:{(DateTime.Now - now).TotalMilliseconds}ms,{item.Key}");
}
});
_log.LogInformation($"同步文件夹完成,总文件夹数:{_localFiles.Count},用时:{(DateTime.Now - now).TotalMilliseconds}ms");

now = DateTime.Now;
process = 0;
total = _localFiles.Count;

// 并行上传
await Parallel.ForEachAsync(_localFiles, options, async (item, cancellationToken) =>
Expand All @@ -676,11 +739,11 @@ await Parallel.ForEachAsync(_localFiles, options, async (item, cancellationToken
finally
{
Interlocked.Increment(ref process);
_log.LogInformation($"同步中 {process}/{total},用时:{(DateTime.Now - now).TotalMilliseconds}ms,{item.Key}");
_log.LogInformation($"同步文件中 {process}/{total},用时:{(DateTime.Now - now).TotalMilliseconds}ms,{item.Key}");
}
});

_log.LogInformation($"同步完成,总文件数:{_localFiles.Count},用时:{(DateTime.Now - now).TotalMilliseconds}ms");
_log.LogInformation($"同步文件完成,总文件数:{_localFiles.Count},用时:{(DateTime.Now - now).TotalMilliseconds}ms");
}

/// <summary>
Expand Down Expand Up @@ -833,7 +896,7 @@ void Load(string dir)
var savePath = Path.Combine(_localRestorePath, Path.Combine(subPaths));

var tmpPath = Path.GetDirectoryName(savePath);
lock (_lock)
lock (_localLock)
{
if (!Directory.Exists(tmpPath))
{
Expand Down Expand Up @@ -1690,7 +1753,7 @@ private async Task AliyunDriveDownload(string url, string fileName, string fileS
var tmpPath = Path.GetDirectoryName(tempFilePath);
var path = Path.GetDirectoryName(finalFilePath);

lock (_lock)
lock (_localLock)
{
if (!Directory.Exists(tmpPath))
{
Expand Down Expand Up @@ -1743,17 +1806,18 @@ private async Task AliyunDriveDownload(string url, string fileName, string fileS
}

/// <summary>
/// 阿里云盘 - 创建文件夹(同名不覆盖
/// 阿里云盘 - 创建文件夹(同名不创建
/// </summary>
/// <param name="filePath"></param>
/// <param name="parentId"></param>
/// <returns></returns>
private async Task<string> AliyunDriveCreateFolder(string filePath, string parentId)
{
var name = AliyunDriveHelper.EncodeFileName(filePath);

try
// 同一级文件夹共用一个锁
using (await _lock.LockAsync($"create_folder_{parentId}"))
{
var name = AliyunDriveHelper.EncodeFileName(filePath);

// 判断是否需要创建文件夹
if (parentId == "root")
{
Expand Down Expand Up @@ -1822,10 +1886,6 @@ private async Task<string> AliyunDriveCreateFolder(string filePath, string paren

return data.FileId;
}
catch (Exception)
{
throw;
}
}

/// <summary>
Expand Down Expand Up @@ -2175,6 +2235,7 @@ private async Task AliyunDriveFetchAllFiles(string driveId, string parentFileId,
// 等待 250ms 以遵守限流策略
if (sw.ElapsedMilliseconds < _listRequestInterval)
await Task.Delay((int)(_listRequestInterval - sw.ElapsedMilliseconds));

} while (!string.IsNullOrEmpty(marker));

foreach (var item in allItems)
Expand Down Expand Up @@ -2272,7 +2333,10 @@ private async Task<RestResponse<T>> AliyunDriveExecuteWithRetry<T>(RestRequest r
else if (response.StatusCode == HttpStatusCode.TooManyRequests)
{
retries++;
_log.LogWarning("请求次数过多,第 {@0} 次: {@1}", retries, request.Resource);

// 其他API加一起有个10秒150次的限制。可以根据429和 x-retry - after 头部来判断等待重试的时间

_log.LogWarning("触发限流,请求次数过多,重试第 {@0} 次: {@1}", retries, request.Resource);

if (retries >= maxRetries)
{
Expand Down Expand Up @@ -2385,7 +2449,7 @@ private async Task AliyunDriveInitBackupPath()
query = $"parent_file_id='{searchParentFileId}' and type = 'folder' and name = '{subPath}'"
};
request.AddBody(body);
var response = await _apiClient.ExecuteAsync<AliyunFileList>(request);
var response = await AliyunDriveExecuteWithRetry<AliyunFileList>(request);
if (response.StatusCode != HttpStatusCode.OK)
{
throw response.ErrorException ?? new Exception(response.Content ?? "获取云盘目录失败");
Expand Down
Loading

0 comments on commit 76d2461

Please sign in to comment.