From 4cafdf510a83df5ff4fc937911645498738f0642 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Tue, 26 Dec 2023 15:43:14 +0300 Subject: [PATCH 01/21] ClusterModifier: remove multiple node requests --- src/ClusterModifier/ClusterExpander.cs | 43 +++++++++++++------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index e7898c2..4550256 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; @@ -44,18 +43,19 @@ public async Task ExpandCluster(CancellationToken cancellationToken) _logger.LogDebug("Expanding cluster from {OldConfigPath} to {CurrentConfigPath}", _args.OldConfigPath, _args.ClusterConfigPath); - var dirsToDelete = await FindDirsToDelete(oldConfig, config, cancellationToken); - var copyOperations = await CopyDataToNewReplicas(oldConfig, config, dirsToDelete, cancellationToken); + var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); + var dirsToDelete = FindDirsToDelete(vDiskInfo); + var copyOperations = await CopyDataToNewReplicas(vDiskInfo, dirsToDelete, cancellationToken); if (_args.RemoveUnusedReplicas) await RemoveUnusedReplicas(oldConfig, config, copyOperations, dirsToDelete, cancellationToken); } - private async Task> CopyDataToNewReplicas(ClusterConfiguration oldConfig, ClusterConfiguration config, + private async Task> CopyDataToNewReplicas(List vDiskInfo, HashSet dirsToDelete, CancellationToken cancellationToken) { _logger.LogInformation("Copying data from old to current replicas"); - var sourceDirsByDest = await GetSourceDirsByDestination(oldConfig, config, cancellationToken); + var sourceDirsByDest = GetSourceDirsByDestination(vDiskInfo); var operations = CollectOperations(sourceDirsByDest, dirsToDelete); if (!_args.DryRun) { @@ -73,8 +73,7 @@ public async Task ExpandCluster(CancellationToken cancellationToken) return operations; } - private async Task OnVDiskDirs(ClusterConfiguration oldConfig, ClusterConfiguration config, - Action, IEnumerable> onOldNewDirsForVdisk, + private async Task> GetVDiskInfo(ClusterConfiguration oldConfig, ClusterConfiguration config, CancellationToken cancellationToken) { var vDiskPairs = oldConfig.VDisks.Join(config.VDisks, @@ -83,27 +82,30 @@ private async Task OnVDiskDirs(ClusterConfiguration oldConfig, ClusterConfigurat (ovd, vd) => (ovd, vd)); var oldNodeInfoByName = await GetNodeInfoByName(oldConfig, cancellationToken); var nodeInfoByName = await GetNodeInfoByName(config, cancellationToken); + var result = new List(); foreach (var (oldVDisk, vDisk) in vDiskPairs) { var oldDirs = oldVDisk.Replicas.Select(r => oldNodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, oldVDisk)); var newDirs = vDisk.Replicas.Select(r => nodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, vDisk)); - onOldNewDirsForVdisk(vDisk, oldDirs, newDirs); + result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); } + return result; } - private async Task>> GetSourceDirsByDestination(ClusterConfiguration oldConfig, - ClusterConfiguration config, CancellationToken cancellationToken) + public record struct VDiskInfo(ClusterConfiguration.VDisk VDisk, RemoteDir[] OldDirs, RemoteDir[] NewDirs); + + private Dictionary> GetSourceDirsByDestination(List vDiskInfo) { var sourceDirsByDest = new Dictionary>(); - await OnVDiskDirs(oldConfig, config, (_, oldDirs, newDirs) => + foreach(var info in vDiskInfo) { - var missing = newDirs.Except(oldDirs); + var missing = info.NewDirs.Except(info.OldDirs); foreach (var newDir in missing) if (sourceDirsByDest.TryGetValue(newDir, out var dirs)) - dirs.UnionWith(oldDirs); + dirs.UnionWith(info.OldDirs); else - sourceDirsByDest.Add(newDir, oldDirs.ToHashSet()); - }, cancellationToken); + sourceDirsByDest.Add(newDir, info.OldDirs.ToHashSet()); + } return sourceDirsByDest; } @@ -146,14 +148,13 @@ private async Task> GetNodeInfoByName(ClusterConfig return nodeInfoByName; } - private async Task> FindDirsToDelete(ClusterConfiguration oldConfig, ClusterConfiguration newConfig, - CancellationToken cancellationToken) + private HashSet FindDirsToDelete(List vDiskInfo) { var result = new HashSet(); - await OnVDiskDirs(oldConfig, newConfig, (vDisk, oldDirs, newDirs) => + foreach(var info in vDiskInfo) { - result.UnionWith(oldDirs.Except(newDirs)); - }, cancellationToken); + result.UnionWith(info.OldDirs.Except(info.NewDirs)); + } return result; } From de7ccb257dda3837e69dc65bb67704197ef9ca38 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Tue, 26 Dec 2023 15:48:28 +0300 Subject: [PATCH 02/21] ClusterModifier: extract copy operation --- src/ClusterModifier/ClusterExpander.cs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index 4550256..272daed 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -51,7 +51,7 @@ public async Task ExpandCluster(CancellationToken cancellationToken) await RemoveUnusedReplicas(oldConfig, config, copyOperations, dirsToDelete, cancellationToken); } - private async Task> CopyDataToNewReplicas(List vDiskInfo, + private async Task> CopyDataToNewReplicas(List vDiskInfo, HashSet dirsToDelete, CancellationToken cancellationToken) { _logger.LogInformation("Copying data from old to current replicas"); @@ -60,8 +60,8 @@ public async Task ExpandCluster(CancellationToken cancellationToken) if (!_args.DryRun) { var parallelOperations = operations - .Select(op => ParallelP2PProcessor.CreateOperation(op.from.Address, op.to.Address, - () => Copy(op.from, op.to, cancellationToken))) + .Select(op => ParallelP2PProcessor.CreateOperation(op.From.Address, op.To.Address, + () => Copy(op, cancellationToken))) .ToArray(); await _parallelP2PProcessor.Invoke(_args.CopyParallelDegree, parallelOperations, cancellationToken); } @@ -109,7 +109,7 @@ private Dictionary> GetSourceDirsByDestination(Lis return sourceDirsByDest; } - private static List<(RemoteDir from, RemoteDir to)> CollectOperations(Dictionary> sourceDirsByDest, + private static List CollectOperations(Dictionary> sourceDirsByDest, HashSet dirsToDelete) { var loadCountByAddress = new Dictionary(); @@ -120,7 +120,7 @@ private Dictionary> GetSourceDirsByDestination(Lis loadCountByAddress[src.Address] = 0; loadCountByDir[src] = 0; } - var operations = new List<(RemoteDir, RemoteDir)>(); + var operations = new List(); foreach (var (dest, sources) in sourceDirsByDest.OrderBy(kv => kv.Key.Address.ToString()).ThenBy(kv => kv.Key.Path)) { var bestSource = sources @@ -129,12 +129,14 @@ private Dictionary> GetSourceDirsByDestination(Lis .ThenBy(rd => rd.Address.ToString()).ThenBy(rd => rd.Path).First(); loadCountByAddress[bestSource.Address]++; loadCountByDir[bestSource]++; - operations.Add((bestSource, dest)); + operations.Add(new CopyOperation(bestSource, dest)); } return operations; } + public record struct CopyOperation(RemoteDir From, RemoteDir To); + private async Task> GetNodeInfoByName(ClusterConfiguration config, CancellationToken cancellationToken) { var nodeInfoByName = new Dictionary(); @@ -159,15 +161,15 @@ private HashSet FindDirsToDelete(List vDiskInfo) } private async Task RemoveUnusedReplicas(ClusterConfiguration oldConfig, ClusterConfiguration newConfig, - IEnumerable<(RemoteDir from, RemoteDir to)> copyOperations, HashSet dirsToDelete, + IEnumerable copyOperations, HashSet dirsToDelete, CancellationToken cancellationToken) { _logger.LogInformation("Removing data from old replicas"); var newDirsByOldDir = new Dictionary(); var oldDirsToDeleteWithoutCopy = new List(); var copiedNewByOldDir = copyOperations - .GroupBy(t => t.from) - .ToDictionary(g => g.Key, g => g.Select(t => t.to).Distinct().ToArray()); + .GroupBy(t => t.From) + .ToDictionary(g => g.Key, g => g.Select(t => t.To).Distinct().ToArray()); foreach(var oldDir in dirsToDelete) { if (copiedNewByOldDir.TryGetValue(oldDir, out var copiedNewDirs)) @@ -234,11 +236,11 @@ private async Task RemoveUnusedReplicas(ClusterConfiguration oldConfig, ClusterC } } - private async Task Copy(RemoteDir from, RemoteDir to, CancellationToken cancellationToken) + private async Task Copy(CopyOperation op, CancellationToken cancellationToken) { - var copyResult = await _remoteFileCopier.Copy(from, to, cancellationToken); + var copyResult = await _remoteFileCopier.Copy(op.From, op.To, cancellationToken); if (copyResult.IsError) - throw new OperationException($"Failed to copy data from {from} to {to}"); + throw new OperationException($"Failed to copy data from {op.From} to {op.To}"); return true; } From cb1190c00b69503112ad7cb95cd7059c38ef0160 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Tue, 26 Dec 2023 15:51:34 +0300 Subject: [PATCH 03/21] ClusterModifier: split finding ops and execution --- src/ClusterModifier/ClusterExpander.cs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index 272daed..063d07b 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -44,19 +44,18 @@ public async Task ExpandCluster(CancellationToken cancellationToken) _args.OldConfigPath, _args.ClusterConfigPath); var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); - var dirsToDelete = FindDirsToDelete(vDiskInfo); - var copyOperations = await CopyDataToNewReplicas(vDiskInfo, dirsToDelete, cancellationToken); + var dirsToDelete = GetDirsToDelete(vDiskInfo); + var copyOperations = GetCopyOperations(vDiskInfo, dirsToDelete); + + await CopyDataToNewReplicas(copyOperations, cancellationToken); if (_args.RemoveUnusedReplicas) - await RemoveUnusedReplicas(oldConfig, config, copyOperations, dirsToDelete, cancellationToken); + await RemoveUnusedReplicas(copyOperations, dirsToDelete, cancellationToken); } - private async Task> CopyDataToNewReplicas(List vDiskInfo, - HashSet dirsToDelete, CancellationToken cancellationToken) + private async Task CopyDataToNewReplicas(List operations, CancellationToken cancellationToken) { _logger.LogInformation("Copying data from old to current replicas"); - var sourceDirsByDest = GetSourceDirsByDestination(vDiskInfo); - var operations = CollectOperations(sourceDirsByDest, dirsToDelete); if (!_args.DryRun) { var parallelOperations = operations @@ -70,7 +69,12 @@ private async Task> CopyDataToNewReplicas(List vD foreach(var (from, to) in operations) _logger.LogInformation("Expected copying from {From} to {To}", from, to); } - return operations; + } + + private List GetCopyOperations(List vDiskInfo, HashSet dirsToDelete) + { + var sourceDirsByDest = GetSourceDirsByDestination(vDiskInfo); + return CollectOperations(sourceDirsByDest, dirsToDelete); } private async Task> GetVDiskInfo(ClusterConfiguration oldConfig, ClusterConfiguration config, @@ -150,7 +154,7 @@ private async Task> GetNodeInfoByName(ClusterConfig return nodeInfoByName; } - private HashSet FindDirsToDelete(List vDiskInfo) + private HashSet GetDirsToDelete(List vDiskInfo) { var result = new HashSet(); foreach(var info in vDiskInfo) @@ -160,8 +164,7 @@ private HashSet FindDirsToDelete(List vDiskInfo) return result; } - private async Task RemoveUnusedReplicas(ClusterConfiguration oldConfig, ClusterConfiguration newConfig, - IEnumerable copyOperations, HashSet dirsToDelete, + private async Task RemoveUnusedReplicas(IEnumerable copyOperations, HashSet dirsToDelete, CancellationToken cancellationToken) { _logger.LogInformation("Removing data from old replicas"); From 2a90db459ca1c3fc82a1252f7badc6ebdc4f885c Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Tue, 26 Dec 2023 18:05:00 +0300 Subject: [PATCH 04/21] ClusterModifier: refactor delete --- src/ClusterModifier/ClusterExpander.cs | 110 ++++++++++++++----------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index 063d07b..b9be552 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -32,10 +32,31 @@ public ClusterExpander(ILogger logger, } public async Task ExpandCluster(CancellationToken cancellationToken) + { + var vDiskInfo = await GetVDiskInfo(cancellationToken); + var dirsToDelete = GetDirsToDelete(vDiskInfo); + var copyOperations = GetCopyOperations(vDiskInfo, dirsToDelete); + + await CopyDataToNewReplicas(copyOperations, cancellationToken); + + if (_args.RemoveUnusedReplicas) + { + var confirmedDeleteOperations = GetConfirmedDeleteOperations(copyOperations, dirsToDelete); + if (await InvokeConfirmedDeleteOperations(confirmedDeleteOperations, cancellationToken) + || _args.ForceRemoveUncopiedUnusedReplicas) + { + var unconfirmedDirs = GetUnconfirmedDeleteDirs(copyOperations, dirsToDelete); + await InvokeUnconfirmedDeleteOperations(unconfirmedDirs, cancellationToken); + } + } + } + + private async Task> GetVDiskInfo(CancellationToken cancellationToken) { var oldConfigResult = await _args.GetClusterConfigurationFromFile(_args.OldConfigPath, cancellationToken); if (!oldConfigResult.IsOk(out var oldConfig, out var oldError)) throw new ConfigurationException($"Old config is not available: {oldError}"); + var configResult = await _args.FindClusterConfiguration(cancellationToken: cancellationToken); if (!configResult.IsOk(out var config, out var newError)) throw new ConfigurationException($"Current config is not available: {newError}"); @@ -43,14 +64,7 @@ public async Task ExpandCluster(CancellationToken cancellationToken) _logger.LogDebug("Expanding cluster from {OldConfigPath} to {CurrentConfigPath}", _args.OldConfigPath, _args.ClusterConfigPath); - var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); - var dirsToDelete = GetDirsToDelete(vDiskInfo); - var copyOperations = GetCopyOperations(vDiskInfo, dirsToDelete); - - await CopyDataToNewReplicas(copyOperations, cancellationToken); - - if (_args.RemoveUnusedReplicas) - await RemoveUnusedReplicas(copyOperations, dirsToDelete, cancellationToken); + return await GetVDiskInfo(oldConfig, config, cancellationToken); } private async Task CopyDataToNewReplicas(List operations, CancellationToken cancellationToken) @@ -164,12 +178,10 @@ private HashSet GetDirsToDelete(List vDiskInfo) return result; } - private async Task RemoveUnusedReplicas(IEnumerable copyOperations, HashSet dirsToDelete, - CancellationToken cancellationToken) + public record struct ConfirmedDeleteOperation(RemoteDir DirToDelete, RemoteDir[] Copies); + private List GetConfirmedDeleteOperations(List copyOperations, HashSet dirsToDelete) { - _logger.LogInformation("Removing data from old replicas"); - var newDirsByOldDir = new Dictionary(); - var oldDirsToDeleteWithoutCopy = new List(); + var result = new List(); var copiedNewByOldDir = copyOperations .GroupBy(t => t.From) .ToDictionary(g => g.Key, g => g.Select(t => t.To).Distinct().ToArray()); @@ -177,66 +189,68 @@ private async Task RemoveUnusedReplicas(IEnumerable copyOperation { if (copiedNewByOldDir.TryGetValue(oldDir, out var copiedNewDirs)) { - newDirsByOldDir[oldDir] = copiedNewDirs; + result.Add(new ConfirmedDeleteOperation(oldDir, copiedNewDirs)); } + } + return result; + } + + private List GetUnconfirmedDeleteDirs(List copyOperations, HashSet dirsToDelete) + { + var copied = copyOperations.Select(o => o.From).ToHashSet(); + return dirsToDelete.Except(copied).ToList(); + } + + private async Task InvokeUnconfirmedDeleteOperations(List unconfirmedDirsToRemove, CancellationToken cancellationToken) + { + foreach (var dir in unconfirmedDirsToRemove) + { + if (_args.DryRun) + _logger.LogInformation("Expected removing files from {Directory} (directory has no replicas)", dir); else { - oldDirsToDeleteWithoutCopy.Add(oldDir); + if (await _remoteFileCopier.RemoveInDir(dir, cancellationToken)) + _logger.LogInformation("Removed directory {From}", dir); + else + _logger.LogError("Failed to remove directory {From}", dir); } } - bool errorOccured = false; - foreach (var (oldDirToDelete, newDirs) in newDirsByOldDir) + } + + private async Task InvokeConfirmedDeleteOperations(List operations, CancellationToken cancellationToken) + { + bool noErrors = true; + foreach (var op in operations) { if (_args.DryRun) - _logger.LogInformation("Expected removing files from {Directory}", oldDirToDelete); + _logger.LogInformation("Expected removing files from {Directory}", op.DirToDelete); else { bool deleteAllowed = true; - foreach(var newDir in newDirs) + foreach(var copy in op.Copies) { - if (!await _remoteFileCopier.SourceCopiedToDest(oldDirToDelete, newDir, cancellationToken)) + if (!await _remoteFileCopier.SourceCopiedToDest(op.DirToDelete, copy, cancellationToken)) { - errorOccured = true; + noErrors = false; _logger.LogError("Directories {From} and {To} contain different files, directory {From} can't be removed", - oldDirToDelete, newDir, oldDirToDelete); + op.DirToDelete, copy, op.DirToDelete); deleteAllowed = false; break; } } if (deleteAllowed) { - if (await _remoteFileCopier.RemoveInDir(oldDirToDelete, cancellationToken)) - _logger.LogInformation("Removed directory {From}", oldDirToDelete); - else - { - errorOccured = true; - _logger.LogError("Failed to remove directory {From}", oldDirToDelete); - } - } - } - } - if (oldDirsToDeleteWithoutCopy.Count > 0) - { - if (errorOccured && !_args.ForceRemoveUncopiedUnusedReplicas) - { - _logger.LogError("Error occured during removal of unused replicas with copies, will not remove replicas without copies"); - } - else - { - foreach (var oldDir in oldDirsToDeleteWithoutCopy) - { - if (_args.DryRun) - _logger.LogInformation("Expected removing files from {Directory} (directory has no replicas)", oldDir); + if (await _remoteFileCopier.RemoveInDir(op.DirToDelete, cancellationToken)) + _logger.LogInformation("Removed directory {From}", op.DirToDelete); else { - if (await _remoteFileCopier.RemoveInDir(oldDir, cancellationToken)) - _logger.LogInformation("Removed directory {From}", oldDir); - else - _logger.LogError("Failed to remove directory {From}", oldDir); + noErrors = false; + _logger.LogError("Failed to remove directory {From}", op.DirToDelete); } } } } + return noErrors; } private async Task Copy(CopyOperation op, CancellationToken cancellationToken) From 7dd1551705210c169689df6ed4c9c9753d5d0bc5 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Tue, 26 Dec 2023 18:28:21 +0300 Subject: [PATCH 05/21] ClusterModifier: split actions between classes --- src/ClusterModifier/ClusterExpander.cs | 354 ++---------------- src/ClusterModifier/ClusterStateFinder.cs | 167 +++++++++ src/ClusterModifier/Copier.cs | 68 ++++ src/ClusterModifier/Program.cs | 48 ++- src/ClusterModifier/Remover.cs | 95 +++++ .../WorkSpecificationFinder.cs | 126 +++++++ 6 files changed, 518 insertions(+), 340 deletions(-) create mode 100644 src/ClusterModifier/ClusterStateFinder.cs create mode 100644 src/ClusterModifier/Copier.cs create mode 100644 src/ClusterModifier/Remover.cs create mode 100644 src/ClusterModifier/WorkSpecificationFinder.cs diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index b9be552..e4fe32f 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -1,328 +1,52 @@ -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Net; -using System.Threading; +using System.Threading; using System.Threading.Tasks; -using BobApi.BobEntities; -using BobToolsCli.Exceptions; -using BobToolsCli.Helpers; -using Microsoft.Extensions.Logging; -using RemoteFileCopy; -using RemoteFileCopy.Entities; -namespace ClusterModifier +namespace ClusterModifier; + +public class ClusterExpander { - public class ClusterExpander + private readonly ClusterStateFinder _clusterStateFinder; + private readonly WorkSpecificationFinder _workSpecificationFinder; + private readonly Copier _copier; + private readonly Remover _remover; + private readonly ClusterExpandArguments _args; + + public ClusterExpander( + ClusterStateFinder clusterStateFinder, + WorkSpecificationFinder workSpecificationFinder, + Copier copier, + Remover remover, + ClusterExpandArguments args + ) { - private readonly ILogger _logger; - private readonly IRemoteFileCopier _remoteFileCopier; - private readonly ParallelP2PProcessor _parallelP2PProcessor; - private readonly ClusterExpandArguments _args; - - public ClusterExpander(ILogger logger, - IRemoteFileCopier remoteFileCopier, - ParallelP2PProcessor parallelP2PProcessor, - ClusterExpandArguments args) - { - _logger = logger; - _remoteFileCopier = remoteFileCopier; - _parallelP2PProcessor = parallelP2PProcessor; - _args = args; - } - - public async Task ExpandCluster(CancellationToken cancellationToken) - { - var vDiskInfo = await GetVDiskInfo(cancellationToken); - var dirsToDelete = GetDirsToDelete(vDiskInfo); - var copyOperations = GetCopyOperations(vDiskInfo, dirsToDelete); - - await CopyDataToNewReplicas(copyOperations, cancellationToken); - - if (_args.RemoveUnusedReplicas) - { - var confirmedDeleteOperations = GetConfirmedDeleteOperations(copyOperations, dirsToDelete); - if (await InvokeConfirmedDeleteOperations(confirmedDeleteOperations, cancellationToken) - || _args.ForceRemoveUncopiedUnusedReplicas) - { - var unconfirmedDirs = GetUnconfirmedDeleteDirs(copyOperations, dirsToDelete); - await InvokeUnconfirmedDeleteOperations(unconfirmedDirs, cancellationToken); - } - } - } - - private async Task> GetVDiskInfo(CancellationToken cancellationToken) - { - var oldConfigResult = await _args.GetClusterConfigurationFromFile(_args.OldConfigPath, cancellationToken); - if (!oldConfigResult.IsOk(out var oldConfig, out var oldError)) - throw new ConfigurationException($"Old config is not available: {oldError}"); - - var configResult = await _args.FindClusterConfiguration(cancellationToken: cancellationToken); - if (!configResult.IsOk(out var config, out var newError)) - throw new ConfigurationException($"Current config is not available: {newError}"); - - _logger.LogDebug("Expanding cluster from {OldConfigPath} to {CurrentConfigPath}", - _args.OldConfigPath, _args.ClusterConfigPath); - - return await GetVDiskInfo(oldConfig, config, cancellationToken); - } - - private async Task CopyDataToNewReplicas(List operations, CancellationToken cancellationToken) - { - _logger.LogInformation("Copying data from old to current replicas"); - if (!_args.DryRun) - { - var parallelOperations = operations - .Select(op => ParallelP2PProcessor.CreateOperation(op.From.Address, op.To.Address, - () => Copy(op, cancellationToken))) - .ToArray(); - await _parallelP2PProcessor.Invoke(_args.CopyParallelDegree, parallelOperations, cancellationToken); - } - else - { - foreach(var (from, to) in operations) - _logger.LogInformation("Expected copying from {From} to {To}", from, to); - } - } - - private List GetCopyOperations(List vDiskInfo, HashSet dirsToDelete) - { - var sourceDirsByDest = GetSourceDirsByDestination(vDiskInfo); - return CollectOperations(sourceDirsByDest, dirsToDelete); - } - - private async Task> GetVDiskInfo(ClusterConfiguration oldConfig, ClusterConfiguration config, - CancellationToken cancellationToken) - { - var vDiskPairs = oldConfig.VDisks.Join(config.VDisks, - vd => vd.Id, - vd => vd.Id, - (ovd, vd) => (ovd, vd)); - var oldNodeInfoByName = await GetNodeInfoByName(oldConfig, cancellationToken); - var nodeInfoByName = await GetNodeInfoByName(config, cancellationToken); - var result = new List(); - foreach (var (oldVDisk, vDisk) in vDiskPairs) - { - var oldDirs = oldVDisk.Replicas.Select(r => oldNodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, oldVDisk)); - var newDirs = vDisk.Replicas.Select(r => nodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, vDisk)); - result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); - } - return result; - } - - public record struct VDiskInfo(ClusterConfiguration.VDisk VDisk, RemoteDir[] OldDirs, RemoteDir[] NewDirs); - - private Dictionary> GetSourceDirsByDestination(List vDiskInfo) - { - var sourceDirsByDest = new Dictionary>(); - foreach(var info in vDiskInfo) - { - var missing = info.NewDirs.Except(info.OldDirs); - foreach (var newDir in missing) - if (sourceDirsByDest.TryGetValue(newDir, out var dirs)) - dirs.UnionWith(info.OldDirs); - else - sourceDirsByDest.Add(newDir, info.OldDirs.ToHashSet()); - } - return sourceDirsByDest; - } - - private static List CollectOperations(Dictionary> sourceDirsByDest, - HashSet dirsToDelete) - { - var loadCountByAddress = new Dictionary(); - var loadCountByDir = new Dictionary(); - foreach (var sources in sourceDirsByDest.Values) - foreach (var src in sources) - { - loadCountByAddress[src.Address] = 0; - loadCountByDir[src] = 0; - } - var operations = new List(); - foreach (var (dest, sources) in sourceDirsByDest.OrderBy(kv => kv.Key.Address.ToString()).ThenBy(kv => kv.Key.Path)) - { - var bestSource = sources - .OrderByDescending(rd => dirsToDelete.Contains(rd) && loadCountByDir[rd] == 0) - .ThenBy(rd => loadCountByAddress[rd.Address] - (rd.Address == dest.Address ? 1 : 0)) - .ThenBy(rd => rd.Address.ToString()).ThenBy(rd => rd.Path).First(); - loadCountByAddress[bestSource.Address]++; - loadCountByDir[bestSource]++; - operations.Add(new CopyOperation(bestSource, dest)); - } - - return operations; - } - - public record struct CopyOperation(RemoteDir From, RemoteDir To); - - private async Task> GetNodeInfoByName(ClusterConfiguration config, CancellationToken cancellationToken) - { - var nodeInfoByName = new Dictionary(); - foreach (var node in config.Nodes) - { - var cr = await GetCreator(node, cancellationToken); - var disks = node.Disks.ToDictionary(d => d.Name); - nodeInfoByName.Add(node.Name, new NodeInfo(node, cr, disks)); - } - - return nodeInfoByName; - } - - private HashSet GetDirsToDelete(List vDiskInfo) - { - var result = new HashSet(); - foreach(var info in vDiskInfo) - { - result.UnionWith(info.OldDirs.Except(info.NewDirs)); - } - return result; - } - - public record struct ConfirmedDeleteOperation(RemoteDir DirToDelete, RemoteDir[] Copies); - private List GetConfirmedDeleteOperations(List copyOperations, HashSet dirsToDelete) - { - var result = new List(); - var copiedNewByOldDir = copyOperations - .GroupBy(t => t.From) - .ToDictionary(g => g.Key, g => g.Select(t => t.To).Distinct().ToArray()); - foreach(var oldDir in dirsToDelete) - { - if (copiedNewByOldDir.TryGetValue(oldDir, out var copiedNewDirs)) - { - result.Add(new ConfirmedDeleteOperation(oldDir, copiedNewDirs)); - } - } - return result; - } - - private List GetUnconfirmedDeleteDirs(List copyOperations, HashSet dirsToDelete) - { - var copied = copyOperations.Select(o => o.From).ToHashSet(); - return dirsToDelete.Except(copied).ToList(); - } - - private async Task InvokeUnconfirmedDeleteOperations(List unconfirmedDirsToRemove, CancellationToken cancellationToken) - { - foreach (var dir in unconfirmedDirsToRemove) - { - if (_args.DryRun) - _logger.LogInformation("Expected removing files from {Directory} (directory has no replicas)", dir); - else - { - if (await _remoteFileCopier.RemoveInDir(dir, cancellationToken)) - _logger.LogInformation("Removed directory {From}", dir); - else - _logger.LogError("Failed to remove directory {From}", dir); - } - } - } - - private async Task InvokeConfirmedDeleteOperations(List operations, CancellationToken cancellationToken) - { - bool noErrors = true; - foreach (var op in operations) - { - if (_args.DryRun) - _logger.LogInformation("Expected removing files from {Directory}", op.DirToDelete); - else - { - bool deleteAllowed = true; - foreach(var copy in op.Copies) - { - if (!await _remoteFileCopier.SourceCopiedToDest(op.DirToDelete, copy, cancellationToken)) - { - noErrors = false; - _logger.LogError("Directories {From} and {To} contain different files, directory {From} can't be removed", - op.DirToDelete, copy, op.DirToDelete); - deleteAllowed = false; - break; - } - } - if (deleteAllowed) - { - if (await _remoteFileCopier.RemoveInDir(op.DirToDelete, cancellationToken)) - _logger.LogInformation("Removed directory {From}", op.DirToDelete); - else - { - noErrors = false; - _logger.LogError("Failed to remove directory {From}", op.DirToDelete); - } - } - } - } - return noErrors; - } - - private async Task Copy(CopyOperation op, CancellationToken cancellationToken) - { - var copyResult = await _remoteFileCopier.Copy(op.From, op.To, cancellationToken); - if (copyResult.IsError) - throw new OperationException($"Failed to copy data from {op.From} to {op.To}"); - return true; - } - - private async Task> GetAllRemoteDirs(ClusterConfiguration config, CancellationToken cancellationToken) - { - var result = new HashSet(); - foreach (var node in config.Nodes) - { - var creator = await GetCreator(node, cancellationToken); - foreach (var vDisk in config.VDisks) - foreach (var replica in vDisk.Replicas.Where(r => r.Node == node.Name)) - { - var disk = node.Disks.Find(d => d.Name == replica.Disk); - if (disk == null) - throw new ClusterStateException($"Replica {replica} not found"); - - result.Add(creator(disk, vDisk)); - } - } - return result; - } + _clusterStateFinder = clusterStateFinder; + _workSpecificationFinder = workSpecificationFinder; + _copier = copier; + _remover = remover; + _args = args; + } - private delegate RemoteDir GetRemoteDir(ClusterConfiguration.Node.Disk disk, ClusterConfiguration.VDisk vDisk); - private async ValueTask GetCreator( - ClusterConfiguration.Node node, CancellationToken cancellationToken = default) - { - var addr = await node.FindIPAddress(); - var rootDir = await GetRootDir(node, cancellationToken); - return (disk, vdisk) => new RemoteDir(addr, Path.Combine(disk.Path, rootDir, vdisk.Id.ToString())); - } + public async Task ExpandCluster(CancellationToken cancellationToken) + { + var clusterState = await _clusterStateFinder.Find(cancellationToken); + var workSpecification = _workSpecificationFinder.Find(clusterState); - private async ValueTask GetRootDir(ClusterConfiguration.Node node, CancellationToken cancellationToken = default) - { - var rootDir = _args.FindRootDir(node.Name); - if (rootDir == null) - { - var client = _args.GetBobApiClientProvider().GetClient(node); - var nodeConfigResult = await client.GetNodeConfiguration(cancellationToken); - if (nodeConfigResult.IsOk(out var conf, out var error)) - rootDir = conf.RootDir; - else - throw new ClusterStateException($"Node {node.Name} configuration is unavailable: {error}, " + - "and bob-root-dir does not contain enough information"); - } - return rootDir; - } + await _copier.Copy(workSpecification.CopyOperations, cancellationToken); - private readonly struct NodeInfo + if (_args.RemoveUnusedReplicas) { - private readonly GetRemoteDir _getRemoteDir; - private readonly Dictionary _diskByName; - private readonly ClusterConfiguration.Node _node; - - public NodeInfo(ClusterConfiguration.Node node, - GetRemoteDir getRemoteDir, Dictionary diskByName) + if ( + await _remover.RemoveConfirmed( + workSpecification.ConfirmedDeleteOperations, + cancellationToken + ) || _args.ForceRemoveUncopiedUnusedReplicas + ) { - _node = node; - _getRemoteDir = getRemoteDir; - _diskByName = diskByName; + await _remover.RemoveUnconfirmed( + workSpecification.UnconfirmedDeleteDirs, + cancellationToken + ); } - - public RemoteDir GetRemoteDirForDisk(string diskName, ClusterConfiguration.VDisk vDisk) - => _diskByName.TryGetValue(diskName, out var disk) - ? _getRemoteDir(disk, vDisk) - : throw new ClusterStateException($"Disk {diskName} not found on node {_node.Name}"); } } } diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs new file mode 100644 index 0000000..b42dce3 --- /dev/null +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -0,0 +1,167 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; +using BobToolsCli.Exceptions; +using Microsoft.Extensions.Logging; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public class ClusterStateFinder +{ + private readonly ClusterExpandArguments _args; + private readonly ILogger _logger; + + public ClusterStateFinder(ClusterExpandArguments args, ILogger logger) + { + _args = args; + _logger = logger; + } + + public async Task Find(CancellationToken cancellationToken) + { + return new ClusterState(await GetVDiskInfo(cancellationToken)); + } + + private async Task> GetVDiskInfo(CancellationToken cancellationToken) + { + var oldConfigResult = await _args.GetClusterConfigurationFromFile( + _args.OldConfigPath, + cancellationToken + ); + if (!oldConfigResult.IsOk(out var oldConfig, out var oldError)) + throw new ConfigurationException($"Old config is not available: {oldError}"); + + var configResult = await _args.FindClusterConfiguration( + cancellationToken: cancellationToken + ); + if (!configResult.IsOk(out var config, out var newError)) + throw new ConfigurationException($"Current config is not available: {newError}"); + + _logger.LogDebug( + "Expanding cluster from {OldConfigPath} to {CurrentConfigPath}", + _args.OldConfigPath, + _args.ClusterConfigPath + ); + + return await GetVDiskInfo(oldConfig, config, cancellationToken); + } + + private async Task> GetVDiskInfo( + ClusterConfiguration oldConfig, + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + var vDiskPairs = oldConfig.VDisks.Join( + config.VDisks, + vd => vd.Id, + vd => vd.Id, + (ovd, vd) => (ovd, vd) + ); + var oldNodeInfoByName = await GetNodeInfoByName(oldConfig, cancellationToken); + var nodeInfoByName = await GetNodeInfoByName(config, cancellationToken); + var result = new List(); + foreach (var (oldVDisk, vDisk) in vDiskPairs) + { + var oldDirs = oldVDisk.Replicas.Select( + r => oldNodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, oldVDisk) + ); + var newDirs = vDisk.Replicas.Select( + r => nodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, vDisk) + ); + result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); + } + return result; + } + + private async Task> GetNodeInfoByName( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + var nodeInfoByName = new Dictionary(); + foreach (var node in config.Nodes) + { + var cr = await GetCreator(node, cancellationToken); + var disks = node.Disks.ToDictionary(d => d.Name); + nodeInfoByName.Add(node.Name, new NodeInfo(node, cr, disks)); + } + + return nodeInfoByName; + } + + private delegate RemoteDir GetRemoteDir( + ClusterConfiguration.Node.Disk disk, + ClusterConfiguration.VDisk vDisk + ); + + private async ValueTask GetCreator( + ClusterConfiguration.Node node, + CancellationToken cancellationToken = default + ) + { + var addr = await node.FindIPAddress(); + var rootDir = await GetRootDir(node, cancellationToken); + return (disk, vdisk) => + new RemoteDir(addr, Path.Combine(disk.Path, rootDir, vdisk.Id.ToString())); + } + + private async ValueTask GetRootDir( + ClusterConfiguration.Node node, + CancellationToken cancellationToken = default + ) + { + var rootDir = _args.FindRootDir(node.Name); + if (rootDir == null) + { + var client = _args.GetBobApiClientProvider().GetClient(node); + var nodeConfigResult = await client.GetNodeConfiguration(cancellationToken); + if (nodeConfigResult.IsOk(out var conf, out var error)) + rootDir = conf.RootDir; + else + throw new ClusterStateException( + $"Node {node.Name} configuration is unavailable: {error}, " + + "and bob-root-dir does not contain enough information" + ); + } + return rootDir; + } + + private readonly struct NodeInfo + { + private readonly GetRemoteDir _getRemoteDir; + private readonly Dictionary _diskByName; + private readonly ClusterConfiguration.Node _node; + + public NodeInfo( + ClusterConfiguration.Node node, + GetRemoteDir getRemoteDir, + Dictionary diskByName + ) + { + _node = node; + _getRemoteDir = getRemoteDir; + _diskByName = diskByName; + } + + public RemoteDir GetRemoteDirForDisk(string diskName, ClusterConfiguration.VDisk vDisk) => + _diskByName.TryGetValue(diskName, out var disk) + ? _getRemoteDir(disk, vDisk) + : throw new ClusterStateException( + $"Disk {diskName} not found on node {_node.Name}" + ); + } +} + +public record struct ClusterState(List VDiskInfo); + +public record struct VDiskInfo( + ClusterConfiguration.VDisk VDisk, + RemoteDir[] OldDirs, + RemoteDir[] NewDirs +); diff --git a/src/ClusterModifier/Copier.cs b/src/ClusterModifier/Copier.cs new file mode 100644 index 0000000..2896050 --- /dev/null +++ b/src/ClusterModifier/Copier.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BobToolsCli.Exceptions; +using BobToolsCli.Helpers; +using Microsoft.Extensions.Logging; +using RemoteFileCopy; + +namespace ClusterModifier; + +public class Copier +{ + private readonly IRemoteFileCopier _remoteFileCopier; + private readonly ParallelP2PProcessor _parallelP2PProcessor; + private readonly ILogger _logger; + private readonly ClusterExpandArguments _args; + + public Copier( + IRemoteFileCopier remoteFileCopier, + ParallelP2PProcessor parallelP2PProcessor, + ILogger logger, + ClusterExpandArguments args + ) + { + _remoteFileCopier = remoteFileCopier; + _parallelP2PProcessor = parallelP2PProcessor; + _logger = logger; + _args = args; + } + + public async Task Copy(List copyOperations, CancellationToken cancellationToken) + { + _logger.LogInformation("Copying data from old to current replicas"); + if (!_args.DryRun) + { + var parallelOperations = copyOperations + .Select( + op => + ParallelP2PProcessor.CreateOperation( + op.From.Address, + op.To.Address, + () => Copy(op, cancellationToken) + ) + ) + .ToArray(); + await _parallelP2PProcessor.Invoke( + _args.CopyParallelDegree, + parallelOperations, + cancellationToken + ); + } + else + { + foreach (var op in copyOperations) + _logger.LogInformation("Expected copying from {From} to {To}", op.From, op.To); + } + } + + private async Task Copy(CopyOperation op, CancellationToken cancellationToken) + { + var copyResult = await _remoteFileCopier.Copy(op.From, op.To, cancellationToken); + if (copyResult.IsError) + throw new OperationException($"Failed to copy data from {op.From} to {op.To}"); + return true; + } +} diff --git a/src/ClusterModifier/Program.cs b/src/ClusterModifier/Program.cs index 61ebcde..87df698 100644 --- a/src/ClusterModifier/Program.cs +++ b/src/ClusterModifier/Program.cs @@ -1,36 +1,34 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Linq; -using System.Threading; +using System.Threading; using System.Threading.Tasks; -using BobApi.BobEntities; using BobToolsCli; -using CommandLine; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using RemoteFileCopy.Extensions; -namespace ClusterModifier +namespace ClusterModifier; + +public class Program { - public class Program + public static async Task Main(string[] args) { - public static async Task Main(string[] args) - { - await CliHelper.RunWithParsed(args, ExpandCluster); - } + await CliHelper.RunWithParsed(args, ExpandCluster); + } - private static async Task ExpandCluster(ClusterExpandArguments arguments, IServiceCollection services, - CancellationToken cancellationToken) - { - services.AddTransient(); - services.AddRemoteFileCopy(arguments.SshConfiguration, arguments.FilesFinderConfiguration); - using var provider = services.BuildServiceProvider(); + private static async Task ExpandCluster( + ClusterExpandArguments arguments, + IServiceCollection services, + CancellationToken cancellationToken + ) + { + services + .AddTransient() + .AddTransient() + .AddTransient() + .AddTransient() + .AddTransient(); + services.AddRemoteFileCopy(arguments.SshConfiguration, arguments.FilesFinderConfiguration); + using var provider = services.BuildServiceProvider(); - var expander = provider.GetRequiredService(); - await expander.ExpandCluster(cancellationToken); - } + var expander = provider.GetRequiredService(); + await expander.ExpandCluster(cancellationToken); } } diff --git a/src/ClusterModifier/Remover.cs b/src/ClusterModifier/Remover.cs new file mode 100644 index 0000000..908a4c9 --- /dev/null +++ b/src/ClusterModifier/Remover.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using RemoteFileCopy; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public class Remover +{ + private readonly IRemoteFileCopier _remoteFileCopier; + private readonly ILogger _logger; + private readonly ClusterExpandArguments _args; + + public Remover( + IRemoteFileCopier remoteFileCopier, + ILogger logger, + ClusterExpandArguments args + ) + { + _remoteFileCopier = remoteFileCopier; + _logger = logger; + _args = args; + } + + public async Task RemoveConfirmed( + List operations, + CancellationToken cancellationToken + ) + { + bool noErrors = true; + foreach (var op in operations) + { + if (_args.DryRun) + _logger.LogInformation("Expected removing files from {Directory}", op.DirToDelete); + else + { + bool deleteAllowed = true; + foreach (var copy in op.Copies) + { + if ( + !await _remoteFileCopier.SourceCopiedToDest( + op.DirToDelete, + copy, + cancellationToken + ) + ) + { + noErrors = false; + _logger.LogError( + "Directories {From} and {To} contain different files, directory {From} can't be removed", + op.DirToDelete, + copy, + op.DirToDelete + ); + deleteAllowed = false; + break; + } + } + if (deleteAllowed) + { + if (await _remoteFileCopier.RemoveInDir(op.DirToDelete, cancellationToken)) + _logger.LogInformation("Removed directory {From}", op.DirToDelete); + else + { + noErrors = false; + _logger.LogError("Failed to remove directory {From}", op.DirToDelete); + } + } + } + } + return noErrors; + } + + public async Task RemoveUnconfirmed(List dirs, CancellationToken cancellationToken) + { + foreach (var dir in dirs) + { + if (_args.DryRun) + _logger.LogInformation( + "Expected removing files from {Directory} (directory has no replicas)", + dir + ); + else + { + if (await _remoteFileCopier.RemoveInDir(dir, cancellationToken)) + _logger.LogInformation("Removed directory {From}", dir); + else + _logger.LogError("Failed to remove directory {From}", dir); + } + } + } +} diff --git a/src/ClusterModifier/WorkSpecificationFinder.cs b/src/ClusterModifier/WorkSpecificationFinder.cs new file mode 100644 index 0000000..49001b4 --- /dev/null +++ b/src/ClusterModifier/WorkSpecificationFinder.cs @@ -0,0 +1,126 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public class WorkSpecificationFinder +{ + public WorkSpecification Find(ClusterState clusterState) + { + var dirsToDelete = GetDirsToDelete(clusterState.VDiskInfo); + var copyOperations = GetCopyOperations(clusterState.VDiskInfo, dirsToDelete); + var confirmedDelete = GetConfirmedDeleteOperations(copyOperations, dirsToDelete); + var unconfirmedDelete = GetUnconfirmedDeleteDirs(copyOperations, dirsToDelete); + return new WorkSpecification(copyOperations, confirmedDelete, unconfirmedDelete); + } + + private List GetCopyOperations( + List vDiskInfo, + HashSet dirsToDelete + ) + { + var sourceDirsByDest = GetSourceDirsByDestination(vDiskInfo); + return CollectOperations(sourceDirsByDest, dirsToDelete); + } + + private Dictionary> GetSourceDirsByDestination( + List vDiskInfo + ) + { + var sourceDirsByDest = new Dictionary>(); + foreach (var info in vDiskInfo) + { + var missing = info.NewDirs.Except(info.OldDirs); + foreach (var newDir in missing) + if (sourceDirsByDest.TryGetValue(newDir, out var dirs)) + dirs.UnionWith(info.OldDirs); + else + sourceDirsByDest.Add(newDir, info.OldDirs.ToHashSet()); + } + return sourceDirsByDest; + } + + private static List CollectOperations( + Dictionary> sourceDirsByDest, + HashSet dirsToDelete + ) + { + var loadCountByAddress = new Dictionary(); + var loadCountByDir = new Dictionary(); + foreach (var sources in sourceDirsByDest.Values) + foreach (var src in sources) + { + loadCountByAddress[src.Address] = 0; + loadCountByDir[src] = 0; + } + var operations = new List(); + foreach ( + var (dest, sources) in sourceDirsByDest + .OrderBy(kv => kv.Key.Address.ToString()) + .ThenBy(kv => kv.Key.Path) + ) + { + var bestSource = sources + .OrderByDescending(rd => dirsToDelete.Contains(rd) && loadCountByDir[rd] == 0) + .ThenBy(rd => loadCountByAddress[rd.Address] - (rd.Address == dest.Address ? 1 : 0)) + .ThenBy(rd => rd.Address.ToString()) + .ThenBy(rd => rd.Path) + .First(); + loadCountByAddress[bestSource.Address]++; + loadCountByDir[bestSource]++; + operations.Add(new CopyOperation(bestSource, dest)); + } + + return operations; + } + + private HashSet GetDirsToDelete(List vDiskInfo) + { + var result = new HashSet(); + foreach (var info in vDiskInfo) + { + result.UnionWith(info.OldDirs.Except(info.NewDirs)); + } + return result; + } + + private List GetConfirmedDeleteOperations( + List copyOperations, + HashSet dirsToDelete + ) + { + var result = new List(); + var copiedNewByOldDir = copyOperations + .GroupBy(t => t.From) + .ToDictionary(g => g.Key, g => g.Select(t => t.To).Distinct().ToArray()); + foreach (var oldDir in dirsToDelete) + { + if (copiedNewByOldDir.TryGetValue(oldDir, out var copiedNewDirs)) + { + result.Add(new ConfirmedDeleteOperation(oldDir, copiedNewDirs)); + } + } + return result; + } + + private List GetUnconfirmedDeleteDirs( + List copyOperations, + HashSet dirsToDelete + ) + { + var copied = copyOperations.Select(o => o.From).ToHashSet(); + return dirsToDelete.Except(copied).ToList(); + } +} + +public record struct CopyOperation(RemoteDir From, RemoteDir To); + +public record struct ConfirmedDeleteOperation(RemoteDir DirToDelete, RemoteDir[] Copies); + +public record struct WorkSpecification( + List CopyOperations, + List ConfirmedDeleteOperations, + List UnconfirmedDeleteDirs +); From a7eaecf3fde557fcb22f0d8a5db4195b4840643e Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 27 Dec 2023 17:32:38 +0300 Subject: [PATCH 06/21] ClusterModifier: move dry run handling to expander --- src/ClusterModifier/ClusterExpander.cs | 67 +++++++++++++---- src/ClusterModifier/Copier.cs | 56 ++++++--------- src/ClusterModifier/Remover.cs | 99 ++++++++++++-------------- 3 files changed, 121 insertions(+), 101 deletions(-) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index e4fe32f..2ec4392 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -1,5 +1,6 @@ using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace ClusterModifier; @@ -10,13 +11,15 @@ public class ClusterExpander private readonly Copier _copier; private readonly Remover _remover; private readonly ClusterExpandArguments _args; + private readonly ILogger _logger; public ClusterExpander( ClusterStateFinder clusterStateFinder, WorkSpecificationFinder workSpecificationFinder, Copier copier, Remover remover, - ClusterExpandArguments args + ClusterExpandArguments args, + ILogger logger ) { _clusterStateFinder = clusterStateFinder; @@ -24,6 +27,7 @@ ClusterExpandArguments args _copier = copier; _remover = remover; _args = args; + _logger = logger; } public async Task ExpandCluster(CancellationToken cancellationToken) @@ -31,22 +35,59 @@ public async Task ExpandCluster(CancellationToken cancellationToken) var clusterState = await _clusterStateFinder.Find(cancellationToken); var workSpecification = _workSpecificationFinder.Find(clusterState); - await _copier.Copy(workSpecification.CopyOperations, cancellationToken); + await Copy(workSpecification, cancellationToken); if (_args.RemoveUnusedReplicas) { - if ( - await _remover.RemoveConfirmed( - workSpecification.ConfirmedDeleteOperations, - cancellationToken - ) || _args.ForceRemoveUncopiedUnusedReplicas - ) - { - await _remover.RemoveUnconfirmed( - workSpecification.UnconfirmedDeleteDirs, - cancellationToken + await Remove(workSpecification, cancellationToken); + } + } + + private async Task Copy( + WorkSpecification workSpecification, + CancellationToken cancellationToken + ) + { + if (_args.DryRun) + { + foreach (var op in workSpecification.CopyOperations) + _logger.LogInformation("Expected copying from {From} to {To}", op.From, op.To); + } + else + { + _logger.LogInformation("Copying data from old to current replicas"); + await _copier.Copy( + workSpecification.CopyOperations, + _args.CopyParallelDegree, + cancellationToken + ); + } + } + + private async Task Remove( + WorkSpecification workSpecification, + CancellationToken cancellationToken + ) + { + if (_args.DryRun) + { + foreach (var op in workSpecification.ConfirmedDeleteOperations) + _logger.LogInformation( + "Expected removing directory {Dir} with checking copies", + op.DirToDelete ); - } + foreach (var dir in workSpecification.UnconfirmedDeleteDirs) + _logger.LogInformation("Expected removing directory {Dir} without checking copies"); + } + else + { + _logger.LogInformation("Removing data from obsolete replicas"); + await _remover.Remove( + workSpecification.ConfirmedDeleteOperations, + workSpecification.UnconfirmedDeleteDirs, + _args.ForceRemoveUncopiedUnusedReplicas, + cancellationToken + ); } } } diff --git a/src/ClusterModifier/Copier.cs b/src/ClusterModifier/Copier.cs index 2896050..f75e737 100644 --- a/src/ClusterModifier/Copier.cs +++ b/src/ClusterModifier/Copier.cs @@ -14,48 +14,34 @@ public class Copier { private readonly IRemoteFileCopier _remoteFileCopier; private readonly ParallelP2PProcessor _parallelP2PProcessor; - private readonly ILogger _logger; - private readonly ClusterExpandArguments _args; - public Copier( - IRemoteFileCopier remoteFileCopier, - ParallelP2PProcessor parallelP2PProcessor, - ILogger logger, - ClusterExpandArguments args - ) + public Copier(IRemoteFileCopier remoteFileCopier, ParallelP2PProcessor parallelP2PProcessor) { _remoteFileCopier = remoteFileCopier; _parallelP2PProcessor = parallelP2PProcessor; - _logger = logger; - _args = args; } - public async Task Copy(List copyOperations, CancellationToken cancellationToken) + public async Task Copy( + List copyOperations, + int copyParallelDegree, + CancellationToken cancellationToken + ) { - _logger.LogInformation("Copying data from old to current replicas"); - if (!_args.DryRun) - { - var parallelOperations = copyOperations - .Select( - op => - ParallelP2PProcessor.CreateOperation( - op.From.Address, - op.To.Address, - () => Copy(op, cancellationToken) - ) - ) - .ToArray(); - await _parallelP2PProcessor.Invoke( - _args.CopyParallelDegree, - parallelOperations, - cancellationToken - ); - } - else - { - foreach (var op in copyOperations) - _logger.LogInformation("Expected copying from {From} to {To}", op.From, op.To); - } + var parallelOperations = copyOperations + .Select( + op => + ParallelP2PProcessor.CreateOperation( + op.From.Address, + op.To.Address, + () => Copy(op, cancellationToken) + ) + ) + .ToArray(); + await _parallelP2PProcessor.Invoke( + copyParallelDegree, + parallelOperations, + cancellationToken + ); } private async Task Copy(CopyOperation op, CancellationToken cancellationToken) diff --git a/src/ClusterModifier/Remover.cs b/src/ClusterModifier/Remover.cs index 908a4c9..6d899f0 100644 --- a/src/ClusterModifier/Remover.cs +++ b/src/ClusterModifier/Remover.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -12,20 +11,27 @@ public class Remover { private readonly IRemoteFileCopier _remoteFileCopier; private readonly ILogger _logger; - private readonly ClusterExpandArguments _args; - public Remover( - IRemoteFileCopier remoteFileCopier, - ILogger logger, - ClusterExpandArguments args - ) + public Remover(IRemoteFileCopier remoteFileCopier, ILogger logger) { _remoteFileCopier = remoteFileCopier; _logger = logger; - _args = args; } - public async Task RemoveConfirmed( + public async Task Remove( + List confirmed, + List unconfirmed, + bool forceRemoveUnconfirmed, + CancellationToken cancellationToken + ) + { + if (await RemoveConfirmed(confirmed, cancellationToken) || forceRemoveUnconfirmed) + { + await RemoveUnconfirmed(unconfirmed, cancellationToken); + } + } + + private async Task RemoveConfirmed( List operations, CancellationToken cancellationToken ) @@ -33,63 +39,50 @@ CancellationToken cancellationToken bool noErrors = true; foreach (var op in operations) { - if (_args.DryRun) - _logger.LogInformation("Expected removing files from {Directory}", op.DirToDelete); - else + bool deleteAllowed = true; + foreach (var copy in op.Copies) { - bool deleteAllowed = true; - foreach (var copy in op.Copies) - { - if ( - !await _remoteFileCopier.SourceCopiedToDest( - op.DirToDelete, - copy, - cancellationToken - ) + if ( + !await _remoteFileCopier.SourceCopiedToDest( + op.DirToDelete, + copy, + cancellationToken ) - { - noErrors = false; - _logger.LogError( - "Directories {From} and {To} contain different files, directory {From} can't be removed", - op.DirToDelete, - copy, - op.DirToDelete - ); - deleteAllowed = false; - break; - } + ) + { + noErrors = false; + _logger.LogError( + "Directories {From} and {To} contain different files, directory {From} can't be removed", + op.DirToDelete, + copy, + op.DirToDelete + ); + deleteAllowed = false; + break; } - if (deleteAllowed) + } + if (deleteAllowed) + { + if (await _remoteFileCopier.RemoveInDir(op.DirToDelete, cancellationToken)) + _logger.LogInformation("Removed directory {From}", op.DirToDelete); + else { - if (await _remoteFileCopier.RemoveInDir(op.DirToDelete, cancellationToken)) - _logger.LogInformation("Removed directory {From}", op.DirToDelete); - else - { - noErrors = false; - _logger.LogError("Failed to remove directory {From}", op.DirToDelete); - } + noErrors = false; + _logger.LogError("Failed to remove directory {From}", op.DirToDelete); } } } return noErrors; } - public async Task RemoveUnconfirmed(List dirs, CancellationToken cancellationToken) + private async Task RemoveUnconfirmed(List dirs, CancellationToken cancellationToken) { foreach (var dir in dirs) { - if (_args.DryRun) - _logger.LogInformation( - "Expected removing files from {Directory} (directory has no replicas)", - dir - ); + if (await _remoteFileCopier.RemoveInDir(dir, cancellationToken)) + _logger.LogInformation("Removed directory {From}", dir); else - { - if (await _remoteFileCopier.RemoveInDir(dir, cancellationToken)) - _logger.LogInformation("Removed directory {From}", dir); - else - _logger.LogError("Failed to remove directory {From}", dir); - } + _logger.LogError("Failed to remove directory {From}", dir); } } } From 75b0fc825d1c81883fa72b19d3f14ed56ae94cdc Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 27 Dec 2023 17:49:20 +0300 Subject: [PATCH 07/21] ClusterModifier: move root dir finding to args --- src/ClusterModifier/ClusterExpandArguments.cs | 28 ++++++++++++++++-- src/ClusterModifier/ClusterStateFinder.cs | 29 +------------------ 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/src/ClusterModifier/ClusterExpandArguments.cs b/src/ClusterModifier/ClusterExpandArguments.cs index bee8c81..51681bc 100644 --- a/src/ClusterModifier/ClusterExpandArguments.cs +++ b/src/ClusterModifier/ClusterExpandArguments.cs @@ -1,5 +1,8 @@ using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; using BobToolsCli; using BobToolsCli.Exceptions; using CommandLine; @@ -27,7 +30,28 @@ public class ClusterExpandArguments : CommonWithSshArguments [Option("copy-parallel-degree", HelpText = "Number of simultaneous copy processes", Default = 1)] public int CopyParallelDegree { get; set; } - public string FindRootDir(string node) + public async ValueTask GetRootDir( + ClusterConfiguration.Node node, + CancellationToken cancellationToken = default + ) + { + var rootDir = FindRootDirOverride(node.Name); + if (rootDir == null) + { + var client = GetBobApiClientProvider().GetClient(node); + var nodeConfigResult = await client.GetNodeConfiguration(cancellationToken); + if (nodeConfigResult.IsOk(out var conf, out var error)) + rootDir = conf.RootDir; + else + throw new ClusterStateException( + $"Node {node.Name} configuration is unavailable: {error}, " + + "and bob-root-dir does not contain enough information" + ); + } + return rootDir; + } + + private string FindRootDirOverride(string nodeName) { if (BobRootDirOverrides.Any()) { @@ -37,7 +61,7 @@ public string FindRootDir(string node) if (split.Length != 2) throw new ConfigurationException("Malformed overrides argument"); - if (split[0] == node || split[0] == "*") + if (split[0] == nodeName || split[0] == "*") return split[1]; } } diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index b42dce3..64084f5 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -42,12 +42,6 @@ private async Task> GetVDiskInfo(CancellationToken cancellationT if (!configResult.IsOk(out var config, out var newError)) throw new ConfigurationException($"Current config is not available: {newError}"); - _logger.LogDebug( - "Expanding cluster from {OldConfigPath} to {CurrentConfigPath}", - _args.OldConfigPath, - _args.ClusterConfigPath - ); - return await GetVDiskInfo(oldConfig, config, cancellationToken); } @@ -106,32 +100,11 @@ private async ValueTask GetCreator( ) { var addr = await node.FindIPAddress(); - var rootDir = await GetRootDir(node, cancellationToken); + var rootDir = await _args.GetRootDir(node, cancellationToken); return (disk, vdisk) => new RemoteDir(addr, Path.Combine(disk.Path, rootDir, vdisk.Id.ToString())); } - private async ValueTask GetRootDir( - ClusterConfiguration.Node node, - CancellationToken cancellationToken = default - ) - { - var rootDir = _args.FindRootDir(node.Name); - if (rootDir == null) - { - var client = _args.GetBobApiClientProvider().GetClient(node); - var nodeConfigResult = await client.GetNodeConfiguration(cancellationToken); - if (nodeConfigResult.IsOk(out var conf, out var error)) - rootDir = conf.RootDir; - else - throw new ClusterStateException( - $"Node {node.Name} configuration is unavailable: {error}, " - + "and bob-root-dir does not contain enough information" - ); - } - return rootDir; - } - private readonly struct NodeInfo { private readonly GetRemoteDir _getRemoteDir; From 9f629155637cfda8f80586122e34be089301d66e Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 3 Jan 2024 20:07:10 +0300 Subject: [PATCH 08/21] ClusterModifier: factor out directories finding --- src/ClusterModifier/ClusterStateFinder.cs | 112 +++++++----------- .../NodeDiskRemoteDirsFinder.cs | 47 ++++++++ src/ClusterModifier/Program.cs | 1 + src/RemoteFileCopy/Entities/RemoteDir.cs | 6 +- 4 files changed, 92 insertions(+), 74 deletions(-) create mode 100644 src/ClusterModifier/NodeDiskRemoteDirsFinder.cs diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index 64084f5..efa9bc4 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -1,6 +1,4 @@ -using System; -using System.Collections.Generic; -using System.IO; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -14,11 +12,17 @@ namespace ClusterModifier; public class ClusterStateFinder { private readonly ClusterExpandArguments _args; + private readonly NodeDiskRemoteDirsFinder _nodeDirsRemoteDirsFinder; private readonly ILogger _logger; - public ClusterStateFinder(ClusterExpandArguments args, ILogger logger) + public ClusterStateFinder( + ClusterExpandArguments args, + NodeDiskRemoteDirsFinder nodeDirsRemoteDirsFinder, + ILogger logger + ) { _args = args; + _nodeDirsRemoteDirsFinder = nodeDirsRemoteDirsFinder; _logger = logger; } @@ -51,83 +55,47 @@ private async Task> GetVDiskInfo( CancellationToken cancellationToken ) { - var vDiskPairs = oldConfig.VDisks.Join( - config.VDisks, - vd => vd.Id, - vd => vd.Id, - (ovd, vd) => (ovd, vd) + var vDiskPairs = oldConfig + .VDisks + .Join(config.VDisks, vd => vd.Id, vd => vd.Id, (ovd, vd) => (ovd, vd)); + var oldRemoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( + oldConfig, + cancellationToken + ); + var newRemoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( + config, + cancellationToken ); - var oldNodeInfoByName = await GetNodeInfoByName(oldConfig, cancellationToken); - var nodeInfoByName = await GetNodeInfoByName(config, cancellationToken); var result = new List(); - foreach (var (oldVDisk, vDisk) in vDiskPairs) + RemoteDir GetOldDir(string node, string disk, long vDisk) { - var oldDirs = oldVDisk.Replicas.Select( - r => oldNodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, oldVDisk) - ); - var newDirs = vDisk.Replicas.Select( - r => nodeInfoByName[r.Node].GetRemoteDirForDisk(r.Disk, vDisk) + if ( + oldRemoteDirByDiskByNode.TryGetValue(node, out var d) + && d.TryGetValue(disk, out var rd) + ) + return rd.GetSubdir(vDisk.ToString()); + throw new ClusterStateException( + $"Disk {disk} not found on node {node} in old cluster config" ); - result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); } - return result; - } - - private async Task> GetNodeInfoByName( - ClusterConfiguration config, - CancellationToken cancellationToken - ) - { - var nodeInfoByName = new Dictionary(); - foreach (var node in config.Nodes) + RemoteDir GetNewDir(string node, string disk, long vDisk) { - var cr = await GetCreator(node, cancellationToken); - var disks = node.Disks.ToDictionary(d => d.Name); - nodeInfoByName.Add(node.Name, new NodeInfo(node, cr, disks)); + if ( + newRemoteDirByDiskByNode.TryGetValue(node, out var d) + && d.TryGetValue(disk, out var rd) + ) + return rd.GetSubdir(vDisk.ToString()); + throw new ClusterStateException( + $"Disk {disk} not found on node {node} in new cluster config" + ); } - - return nodeInfoByName; - } - - private delegate RemoteDir GetRemoteDir( - ClusterConfiguration.Node.Disk disk, - ClusterConfiguration.VDisk vDisk - ); - - private async ValueTask GetCreator( - ClusterConfiguration.Node node, - CancellationToken cancellationToken = default - ) - { - var addr = await node.FindIPAddress(); - var rootDir = await _args.GetRootDir(node, cancellationToken); - return (disk, vdisk) => - new RemoteDir(addr, Path.Combine(disk.Path, rootDir, vdisk.Id.ToString())); - } - - private readonly struct NodeInfo - { - private readonly GetRemoteDir _getRemoteDir; - private readonly Dictionary _diskByName; - private readonly ClusterConfiguration.Node _node; - - public NodeInfo( - ClusterConfiguration.Node node, - GetRemoteDir getRemoteDir, - Dictionary diskByName - ) + foreach (var (oldVDisk, vDisk) in vDiskPairs) { - _node = node; - _getRemoteDir = getRemoteDir; - _diskByName = diskByName; + var oldDirs = oldVDisk.Replicas.Select(r => GetOldDir(r.Node, r.Disk, oldVDisk.Id)); + var newDirs = vDisk.Replicas.Select(r => GetNewDir(r.Node, r.Disk, vDisk.Id)); + result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); } - - public RemoteDir GetRemoteDirForDisk(string diskName, ClusterConfiguration.VDisk vDisk) => - _diskByName.TryGetValue(diskName, out var disk) - ? _getRemoteDir(disk, vDisk) - : throw new ClusterStateException( - $"Disk {diskName} not found on node {_node.Name}" - ); + return result; } } diff --git a/src/ClusterModifier/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/NodeDiskRemoteDirsFinder.cs new file mode 100644 index 0000000..335bd8f --- /dev/null +++ b/src/ClusterModifier/NodeDiskRemoteDirsFinder.cs @@ -0,0 +1,47 @@ +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public class NodeDiskRemoteDirsFinder +{ + private readonly ClusterExpandArguments _args; + + public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) + { + _args = args; + } + + public async Task>> FindRemoteDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + var result = new Dictionary>(); + foreach (var node in config.Nodes) + { + var remoteDirByDisk = await GetRemoteDirByDisk(node, cancellationToken); + result.Add(node.Name, remoteDirByDisk); + } + + return result; + } + + private async ValueTask> GetRemoteDirByDisk( + ClusterConfiguration.Node node, + CancellationToken cancellationToken + ) + { + var addr = await node.FindIPAddress(); + var rootDir = await _args.GetRootDir(node, cancellationToken); + return node.Disks.ToDictionary( + d => d.Name, + d => new RemoteDir(addr, Path.Combine(d.Path, rootDir)) + ); + } +} diff --git a/src/ClusterModifier/Program.cs b/src/ClusterModifier/Program.cs index 87df698..383ef58 100644 --- a/src/ClusterModifier/Program.cs +++ b/src/ClusterModifier/Program.cs @@ -22,6 +22,7 @@ CancellationToken cancellationToken services .AddTransient() .AddTransient() + .AddTransient() .AddTransient() .AddTransient() .AddTransient(); diff --git a/src/RemoteFileCopy/Entities/RemoteDir.cs b/src/RemoteFileCopy/Entities/RemoteDir.cs index 25be7c1..5191575 100644 --- a/src/RemoteFileCopy/Entities/RemoteDir.cs +++ b/src/RemoteFileCopy/Entities/RemoteDir.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Net; namespace RemoteFileCopy.Entities @@ -14,6 +14,8 @@ public RemoteDir(IPAddress address, string path) public IPAddress Address { get; } public string Path { get; } + public RemoteDir GetSubdir(string dir) => new(Address, System.IO.Path.Combine(Path, dir)); + public override string ToString() { return $"{Address}:{Path}"; @@ -36,4 +38,4 @@ public override int GetHashCode() return HashCode.Combine(Address, Path); } } -} \ No newline at end of file +} From 33440084a6e22fa6b6f5cc5b978b04c822a1eda6 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 4 Jan 2024 17:03:53 +0300 Subject: [PATCH 09/21] ClusterModifier: create interfaces for implementing test mode --- src/ClusterModifier/ClusterExpander.cs | 71 ++------------- src/ClusterModifier/ClusterStateAlterer.cs | 88 +++++++++++++++++++ src/ClusterModifier/ClusterStateFinder.cs | 84 +++++++----------- src/ClusterModifier/IConfigurationsFinder.cs | 12 +++ src/ClusterModifier/ICopier.cs | 11 +++ .../INodeDiskRemoteDirsFinder.cs | 13 +++ src/ClusterModifier/IRemover.cs | 12 +++ .../Implementations/ConfigurationsFinder.cs | 37 ++++++++ .../{ => Implementations}/Copier.cs | 6 +- .../NodeDiskRemoteDirsFinder.cs | 2 +- .../{ => Implementations}/Remover.cs | 2 +- src/ClusterModifier/Program.cs | 7 +- 12 files changed, 221 insertions(+), 124 deletions(-) create mode 100644 src/ClusterModifier/ClusterStateAlterer.cs create mode 100644 src/ClusterModifier/IConfigurationsFinder.cs create mode 100644 src/ClusterModifier/ICopier.cs create mode 100644 src/ClusterModifier/INodeDiskRemoteDirsFinder.cs create mode 100644 src/ClusterModifier/IRemover.cs create mode 100644 src/ClusterModifier/Implementations/ConfigurationsFinder.cs rename src/ClusterModifier/{ => Implementations}/Copier.cs (93%) rename src/ClusterModifier/{ => Implementations}/NodeDiskRemoteDirsFinder.cs (95%) rename src/ClusterModifier/{ => Implementations}/Remover.cs (98%) diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index 2ec4392..fdf8526 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -8,86 +8,29 @@ public class ClusterExpander { private readonly ClusterStateFinder _clusterStateFinder; private readonly WorkSpecificationFinder _workSpecificationFinder; - private readonly Copier _copier; - private readonly Remover _remover; + private readonly ClusterStateAlterer _clusterStateAlterer; + private readonly ICopier _copier; + private readonly IRemover _remover; private readonly ClusterExpandArguments _args; private readonly ILogger _logger; public ClusterExpander( ClusterStateFinder clusterStateFinder, WorkSpecificationFinder workSpecificationFinder, - Copier copier, - Remover remover, - ClusterExpandArguments args, - ILogger logger + ClusterStateAlterer clusterStateAlterer ) { _clusterStateFinder = clusterStateFinder; _workSpecificationFinder = workSpecificationFinder; - _copier = copier; - _remover = remover; - _args = args; - _logger = logger; + _clusterStateAlterer = clusterStateAlterer; } public async Task ExpandCluster(CancellationToken cancellationToken) { var clusterState = await _clusterStateFinder.Find(cancellationToken); - var workSpecification = _workSpecificationFinder.Find(clusterState); - - await Copy(workSpecification, cancellationToken); - if (_args.RemoveUnusedReplicas) - { - await Remove(workSpecification, cancellationToken); - } - } - - private async Task Copy( - WorkSpecification workSpecification, - CancellationToken cancellationToken - ) - { - if (_args.DryRun) - { - foreach (var op in workSpecification.CopyOperations) - _logger.LogInformation("Expected copying from {From} to {To}", op.From, op.To); - } - else - { - _logger.LogInformation("Copying data from old to current replicas"); - await _copier.Copy( - workSpecification.CopyOperations, - _args.CopyParallelDegree, - cancellationToken - ); - } - } + var workSpecification = _workSpecificationFinder.Find(clusterState); - private async Task Remove( - WorkSpecification workSpecification, - CancellationToken cancellationToken - ) - { - if (_args.DryRun) - { - foreach (var op in workSpecification.ConfirmedDeleteOperations) - _logger.LogInformation( - "Expected removing directory {Dir} with checking copies", - op.DirToDelete - ); - foreach (var dir in workSpecification.UnconfirmedDeleteDirs) - _logger.LogInformation("Expected removing directory {Dir} without checking copies"); - } - else - { - _logger.LogInformation("Removing data from obsolete replicas"); - await _remover.Remove( - workSpecification.ConfirmedDeleteOperations, - workSpecification.UnconfirmedDeleteDirs, - _args.ForceRemoveUncopiedUnusedReplicas, - cancellationToken - ); - } + await _clusterStateAlterer.Alter(workSpecification, cancellationToken); } } diff --git a/src/ClusterModifier/ClusterStateAlterer.cs b/src/ClusterModifier/ClusterStateAlterer.cs new file mode 100644 index 0000000..9f4b260 --- /dev/null +++ b/src/ClusterModifier/ClusterStateAlterer.cs @@ -0,0 +1,88 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace ClusterModifier; + +public class ClusterStateAlterer +{ + private readonly ClusterExpandArguments _args; + private readonly ICopier _copier; + private readonly IRemover _remover; + private readonly ILogger _logger; + + public ClusterStateAlterer( + ClusterExpandArguments args, + ICopier copier, + IRemover remover, + ILogger logger + ) + { + _args = args; + _copier = copier; + _remover = remover; + _logger = logger; + } + + public async Task Alter( + WorkSpecification workSpecification, + CancellationToken cancellationToken + ) + { + await Copy(workSpecification, cancellationToken); + + if (_args.RemoveUnusedReplicas) + { + await Remove(workSpecification, cancellationToken); + } + } + + private async Task Copy( + WorkSpecification workSpecification, + CancellationToken cancellationToken + ) + { + if (_args.DryRun) + { + foreach (var op in workSpecification.CopyOperations) + _logger.LogInformation("Expected copying from {From} to {To}", op.From, op.To); + } + else + { + _logger.LogInformation("Copying data from old to current replicas"); + await _copier.Copy( + workSpecification.CopyOperations, + _args.CopyParallelDegree, + cancellationToken + ); + } + } + + private async Task Remove( + WorkSpecification workSpecification, + CancellationToken cancellationToken + ) + { + if (_args.DryRun) + { + foreach (var op in workSpecification.ConfirmedDeleteOperations) + _logger.LogInformation( + "Expected removing directory {Dir} with checking copies", + op.DirToDelete + ); + foreach (var dir in workSpecification.UnconfirmedDeleteDirs) + _logger.LogInformation("Expected removing directory {Dir} without checking copies"); + } + else + { + _logger.LogInformation("Removing data from obsolete replicas"); + await _remover.Remove( + workSpecification.ConfirmedDeleteOperations, + workSpecification.UnconfirmedDeleteDirs, + _args.ForceRemoveUncopiedUnusedReplicas, + cancellationToken + ); + } + } +} diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index efa9bc4..ac8bf6c 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -1,29 +1,26 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BobApi.BobEntities; using BobToolsCli.Exceptions; -using Microsoft.Extensions.Logging; using RemoteFileCopy.Entities; namespace ClusterModifier; public class ClusterStateFinder { - private readonly ClusterExpandArguments _args; - private readonly NodeDiskRemoteDirsFinder _nodeDirsRemoteDirsFinder; - private readonly ILogger _logger; + private readonly INodeDiskRemoteDirsFinder _nodeDirsRemoteDirsFinder; + private readonly IConfigurationsFinder _configurationsFinder; public ClusterStateFinder( - ClusterExpandArguments args, - NodeDiskRemoteDirsFinder nodeDirsRemoteDirsFinder, - ILogger logger + INodeDiskRemoteDirsFinder nodeDirsRemoteDirsFinder, + IConfigurationsFinder configurationsFinder ) { - _args = args; _nodeDirsRemoteDirsFinder = nodeDirsRemoteDirsFinder; - _logger = logger; + _configurationsFinder = configurationsFinder; } public async Task Find(CancellationToken cancellationToken) @@ -33,18 +30,8 @@ public async Task Find(CancellationToken cancellationToken) private async Task> GetVDiskInfo(CancellationToken cancellationToken) { - var oldConfigResult = await _args.GetClusterConfigurationFromFile( - _args.OldConfigPath, - cancellationToken - ); - if (!oldConfigResult.IsOk(out var oldConfig, out var oldError)) - throw new ConfigurationException($"Old config is not available: {oldError}"); - - var configResult = await _args.FindClusterConfiguration( - cancellationToken: cancellationToken - ); - if (!configResult.IsOk(out var config, out var newError)) - throw new ConfigurationException($"Current config is not available: {newError}"); + var oldConfig = await _configurationsFinder.FindOldConfig(cancellationToken); + var config = await _configurationsFinder.FindNewConfig(cancellationToken); return await GetVDiskInfo(oldConfig, config, cancellationToken); } @@ -58,44 +45,39 @@ CancellationToken cancellationToken var vDiskPairs = oldConfig .VDisks .Join(config.VDisks, vd => vd.Id, vd => vd.Id, (ovd, vd) => (ovd, vd)); - var oldRemoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( - oldConfig, - cancellationToken - ); - var newRemoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( - config, - cancellationToken - ); + var findOldDir = await GetRemoteDirFinder(oldConfig, "old", cancellationToken); + var findNewDir = await GetRemoteDirFinder(config, "new", cancellationToken); var result = new List(); - RemoteDir GetOldDir(string node, string disk, long vDisk) + foreach (var (oldVDisk, vDisk) in vDiskPairs) { - if ( - oldRemoteDirByDiskByNode.TryGetValue(node, out var d) - && d.TryGetValue(disk, out var rd) - ) - return rd.GetSubdir(vDisk.ToString()); - throw new ClusterStateException( - $"Disk {disk} not found on node {node} in old cluster config" - ); + var oldDirs = oldVDisk.Replicas.Select(r => findOldDir(r.Node, r.Disk, oldVDisk.Id)); + var newDirs = vDisk.Replicas.Select(r => findNewDir(r.Node, r.Disk, vDisk.Id)); + result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); } - RemoteDir GetNewDir(string node, string disk, long vDisk) + return result; + } + + private async Task> GetRemoteDirFinder( + ClusterConfiguration config, + string clusterConfigName, + CancellationToken cancellationToken + ) + { + var remoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( + config, + cancellationToken + ); + return (node, disk, vDiskId) => { if ( - newRemoteDirByDiskByNode.TryGetValue(node, out var d) + remoteDirByDiskByNode.TryGetValue(node, out var d) && d.TryGetValue(disk, out var rd) ) - return rd.GetSubdir(vDisk.ToString()); + return rd.GetSubdir(vDiskId.ToString()); throw new ClusterStateException( - $"Disk {disk} not found on node {node} in new cluster config" + $"Disk {disk} not found on node {node} in cluster config \"{clusterConfigName}\"" ); - } - foreach (var (oldVDisk, vDisk) in vDiskPairs) - { - var oldDirs = oldVDisk.Replicas.Select(r => GetOldDir(r.Node, r.Disk, oldVDisk.Id)); - var newDirs = vDisk.Replicas.Select(r => GetNewDir(r.Node, r.Disk, vDisk.Id)); - result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); - } - return result; + }; } } diff --git a/src/ClusterModifier/IConfigurationsFinder.cs b/src/ClusterModifier/IConfigurationsFinder.cs new file mode 100644 index 0000000..a6ef137 --- /dev/null +++ b/src/ClusterModifier/IConfigurationsFinder.cs @@ -0,0 +1,12 @@ +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; + +namespace ClusterModifier; + +public interface IConfigurationsFinder +{ + Task FindNewConfig(CancellationToken cancellationToken); + Task FindOldConfig(CancellationToken cancellationToken); +} + diff --git a/src/ClusterModifier/ICopier.cs b/src/ClusterModifier/ICopier.cs new file mode 100644 index 0000000..7f88946 --- /dev/null +++ b/src/ClusterModifier/ICopier.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ClusterModifier; + +public interface ICopier +{ + Task Copy(List copyOperations, int copyParallelDegree, CancellationToken cancellationToken); +} + diff --git a/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs new file mode 100644 index 0000000..6537815 --- /dev/null +++ b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs @@ -0,0 +1,13 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public interface INodeDiskRemoteDirsFinder +{ + Task>> FindRemoteDirByDiskByNode(ClusterConfiguration config, CancellationToken cancellationToken); +} + diff --git a/src/ClusterModifier/IRemover.cs b/src/ClusterModifier/IRemover.cs new file mode 100644 index 0000000..974a7ad --- /dev/null +++ b/src/ClusterModifier/IRemover.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public interface IRemover +{ + Task Remove(List confirmed, List unconfirmed, bool forceRemoveUnconfirmed, CancellationToken cancellationToken); +} + diff --git a/src/ClusterModifier/Implementations/ConfigurationsFinder.cs b/src/ClusterModifier/Implementations/ConfigurationsFinder.cs new file mode 100644 index 0000000..ee1b415 --- /dev/null +++ b/src/ClusterModifier/Implementations/ConfigurationsFinder.cs @@ -0,0 +1,37 @@ +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; +using BobToolsCli.Exceptions; + +namespace ClusterModifier; + +public class ConfigurationsFinder : IConfigurationsFinder +{ + private readonly ClusterExpandArguments _args; + + public ConfigurationsFinder(ClusterExpandArguments args) + { + _args = args; + } + + public async Task FindOldConfig(CancellationToken cancellationToken) + { + var oldConfigResult = await _args.GetClusterConfigurationFromFile( + _args.OldConfigPath, + cancellationToken + ); + if (!oldConfigResult.IsOk(out var oldConfig, out var oldError)) + throw new ConfigurationException($"Old config is not available: {oldError}"); + return oldConfig; + } + + public async Task FindNewConfig(CancellationToken cancellationToken) + { + var configResult = await _args.FindClusterConfiguration( + cancellationToken: cancellationToken + ); + if (!configResult.IsOk(out var config, out var newError)) + throw new ConfigurationException($"Current config is not available: {newError}"); + return config; + } +} diff --git a/src/ClusterModifier/Copier.cs b/src/ClusterModifier/Implementations/Copier.cs similarity index 93% rename from src/ClusterModifier/Copier.cs rename to src/ClusterModifier/Implementations/Copier.cs index f75e737..5258037 100644 --- a/src/ClusterModifier/Copier.cs +++ b/src/ClusterModifier/Implementations/Copier.cs @@ -1,16 +1,14 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BobToolsCli.Exceptions; using BobToolsCli.Helpers; -using Microsoft.Extensions.Logging; using RemoteFileCopy; namespace ClusterModifier; -public class Copier +public class Copier : ICopier { private readonly IRemoteFileCopier _remoteFileCopier; private readonly ParallelP2PProcessor _parallelP2PProcessor; diff --git a/src/ClusterModifier/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs similarity index 95% rename from src/ClusterModifier/NodeDiskRemoteDirsFinder.cs rename to src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs index 335bd8f..85d808f 100644 --- a/src/ClusterModifier/NodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs @@ -8,7 +8,7 @@ namespace ClusterModifier; -public class NodeDiskRemoteDirsFinder +public class NodeDiskRemoteDirsFinder : INodeDiskRemoteDirsFinder { private readonly ClusterExpandArguments _args; diff --git a/src/ClusterModifier/Remover.cs b/src/ClusterModifier/Implementations/Remover.cs similarity index 98% rename from src/ClusterModifier/Remover.cs rename to src/ClusterModifier/Implementations/Remover.cs index 6d899f0..0226de6 100644 --- a/src/ClusterModifier/Remover.cs +++ b/src/ClusterModifier/Implementations/Remover.cs @@ -7,7 +7,7 @@ namespace ClusterModifier; -public class Remover +public class Remover : IRemover { private readonly IRemoteFileCopier _remoteFileCopier; private readonly ILogger _logger; diff --git a/src/ClusterModifier/Program.cs b/src/ClusterModifier/Program.cs index 383ef58..1e72b48 100644 --- a/src/ClusterModifier/Program.cs +++ b/src/ClusterModifier/Program.cs @@ -22,10 +22,11 @@ CancellationToken cancellationToken services .AddTransient() .AddTransient() - .AddTransient() .AddTransient() - .AddTransient() - .AddTransient(); + .AddTransient() + .AddTransient() + .AddTransient() + .AddTransient(); services.AddRemoteFileCopy(arguments.SshConfiguration, arguments.FilesFinderConfiguration); using var provider = services.BuildServiceProvider(); From 7a392e896463946a1a428b108f418cdbb55957d1 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 4 Jan 2024 17:23:12 +0300 Subject: [PATCH 10/21] ClusterModifier: add testmode --- .../ConfigurationReadingResult.cs | 8 +- src/ClusterModifier/ClusterExpandArguments.cs | 3 + src/ClusterModifier/Program.cs | 24 ++++- .../CommonImplementation.cs | 98 +++++++++++++++++++ 4 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 src/ClusterModifier/TestModeImplementations/CommonImplementation.cs diff --git a/src/BobToolsCli/ConfigurationReading/ConfigurationReadingResult.cs b/src/BobToolsCli/ConfigurationReading/ConfigurationReadingResult.cs index 615b6d8..05a431c 100644 --- a/src/BobToolsCli/ConfigurationReading/ConfigurationReadingResult.cs +++ b/src/BobToolsCli/ConfigurationReading/ConfigurationReadingResult.cs @@ -1,4 +1,5 @@ -using System; +using System; +using BobToolsCli.Exceptions; namespace BobToolsCli.ConfigurationReading { @@ -27,7 +28,10 @@ public ConfigurationReadingResult Map(Func f) return ConfigurationReadingResult.Ok(f(_data)); } + public T Unwrap() => IsOk(out var d, out var e) ? d : throw new ConfigurationException(e); + public static ConfigurationReadingResult Ok(T data) => new(data, null); + public static ConfigurationReadingResult Error(string error) => new(default, error); } -} \ No newline at end of file +} diff --git a/src/ClusterModifier/ClusterExpandArguments.cs b/src/ClusterModifier/ClusterExpandArguments.cs index 51681bc..be33391 100644 --- a/src/ClusterModifier/ClusterExpandArguments.cs +++ b/src/ClusterModifier/ClusterExpandArguments.cs @@ -18,6 +18,9 @@ public class ClusterExpandArguments : CommonWithSshArguments [Option("dry-run", Required = false, HelpText = "Do not copy anything")] public bool DryRun { get; set; } = false; + [Option("dry-run", Required = false, HelpText = "Special testing run with no interactions with cluster")] + public bool TestRun { get; set; } = false; + [Option("remove-unused-replicas", Required = false, HelpText = "Remove files in unused replicas")] public bool RemoveUnusedReplicas { get; set; } = false; diff --git a/src/ClusterModifier/Program.cs b/src/ClusterModifier/Program.cs index 1e72b48..b21dd17 100644 --- a/src/ClusterModifier/Program.cs +++ b/src/ClusterModifier/Program.cs @@ -23,10 +23,26 @@ CancellationToken cancellationToken .AddTransient() .AddTransient() .AddTransient() - .AddTransient() - .AddTransient() - .AddTransient() - .AddTransient(); + .AddTransient(); + if (arguments.TestRun) + { + services + .AddTransient< + INodeDiskRemoteDirsFinder, + TestModeImplementations.CommonImplementation + >() + .AddTransient() + .AddTransient() + .AddTransient(); + } + else + { + services + .AddTransient() + .AddTransient() + .AddTransient() + .AddTransient(); + } services.AddRemoteFileCopy(arguments.SshConfiguration, arguments.FilesFinderConfiguration); using var provider = services.BuildServiceProvider(); diff --git a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs new file mode 100644 index 0000000..534aeb9 --- /dev/null +++ b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs @@ -0,0 +1,98 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BobApi.BobEntities; +using BobToolsCli.Exceptions; +using Microsoft.Extensions.Logging; +using RemoteFileCopy.Entities; + +namespace ClusterModifier.TestModeImplementations; + +public class CommonImplementation + : IConfigurationsFinder, + INodeDiskRemoteDirsFinder, + ICopier, + IRemover +{ + private readonly ClusterExpandArguments _args; + private readonly ILogger _logger; + + public CommonImplementation(ClusterExpandArguments args, ILogger logger) + { + _args = args; + _logger = logger; + } + + public Task Copy( + List copyOperations, + int copyParallelDegree, + CancellationToken cancellationToken + ) + { + _logger.LogInformation( + "Performing {Count} copy operations, {Degree} max parallel", + copyOperations.Count, + copyParallelDegree + ); + foreach (var op in copyOperations) + _logger.LogInformation("Copy from {From} to {To}", op.From, op.To); + return Task.CompletedTask; + } + + public async Task FindNewConfig(CancellationToken cancellationToken) + { + if (_args.BootstrapNode != null) + throw new ConfigurationException("Only cluster configuration files can be specified"); + return ( + await _args.GetClusterConfigurationFromFile(_args.ClusterConfigPath, cancellationToken) + ).Unwrap(); + } + + public async Task FindOldConfig(CancellationToken cancellationToken) + { + return ( + await _args.GetClusterConfigurationFromFile(_args.OldConfigPath, cancellationToken) + ).Unwrap(); + } + + public Task>> FindRemoteDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + return Task.FromResult( + config + .Nodes + .ToDictionary( + n => n.Name, + n => + n.Disks.ToDictionary( + d => d.Name, + d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}") + ) + ) + ); + } + + public Task Remove( + List confirmed, + List unconfirmed, + bool forceRemoveUnconfirmed, + CancellationToken cancellationToken + ) + { + _logger.LogInformation("Performing {Count} confirmed remove operations", confirmed.Count); + foreach (var op in confirmed) + _logger.LogInformation( + "Check that {@Dirs} contains all data from {Dir}, then remove it", + op.Copies, + op.DirToDelete + ); + if (forceRemoveUnconfirmed) + _logger.LogInformation("Will remove unconfirmed dirs even if error occured"); + foreach (var dir in unconfirmed) + _logger.LogInformation("Remove dir {Dir} without any confirmation", dir); + return Task.CompletedTask; + } +} From 5eebc2e459f1ceb2324fa14f1710840434677e6f Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 5 Jan 2024 17:51:33 +0300 Subject: [PATCH 11/21] ClusterModifier: refactoring --- src/ClusterModifier/ClusterStateFinder.cs | 16 ++++++---------- src/ClusterModifier/Implementations/Copier.cs | 4 ++-- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index ac8bf6c..be2d17d 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -11,29 +11,25 @@ namespace ClusterModifier; public class ClusterStateFinder { - private readonly INodeDiskRemoteDirsFinder _nodeDirsRemoteDirsFinder; + private readonly INodeDiskRemoteDirsFinder _nodeDiskRemoteDirsFinder; private readonly IConfigurationsFinder _configurationsFinder; public ClusterStateFinder( - INodeDiskRemoteDirsFinder nodeDirsRemoteDirsFinder, + INodeDiskRemoteDirsFinder nodeDiskRemoteDirsFinder, IConfigurationsFinder configurationsFinder ) { - _nodeDirsRemoteDirsFinder = nodeDirsRemoteDirsFinder; + _nodeDiskRemoteDirsFinder = nodeDiskRemoteDirsFinder; _configurationsFinder = configurationsFinder; } public async Task Find(CancellationToken cancellationToken) - { - return new ClusterState(await GetVDiskInfo(cancellationToken)); - } - - private async Task> GetVDiskInfo(CancellationToken cancellationToken) { var oldConfig = await _configurationsFinder.FindOldConfig(cancellationToken); var config = await _configurationsFinder.FindNewConfig(cancellationToken); - return await GetVDiskInfo(oldConfig, config, cancellationToken); + var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); + return new ClusterState(vDiskInfo); } private async Task> GetVDiskInfo( @@ -63,7 +59,7 @@ private async Task> GetRemoteDirFinder( CancellationToken cancellationToken ) { - var remoteDirByDiskByNode = await _nodeDirsRemoteDirsFinder.FindRemoteDirByDiskByNode( + var remoteDirByDiskByNode = await _nodeDiskRemoteDirsFinder.FindRemoteDirByDiskByNode( config, cancellationToken ); diff --git a/src/ClusterModifier/Implementations/Copier.cs b/src/ClusterModifier/Implementations/Copier.cs index 5258037..71e8b58 100644 --- a/src/ClusterModifier/Implementations/Copier.cs +++ b/src/ClusterModifier/Implementations/Copier.cs @@ -31,7 +31,7 @@ CancellationToken cancellationToken ParallelP2PProcessor.CreateOperation( op.From.Address, op.To.Address, - () => Copy(op, cancellationToken) + () => InvokeOperation(op, cancellationToken) ) ) .ToArray(); @@ -42,7 +42,7 @@ await _parallelP2PProcessor.Invoke( ); } - private async Task Copy(CopyOperation op, CancellationToken cancellationToken) + private async Task InvokeOperation(CopyOperation op, CancellationToken cancellationToken) { var copyResult = await _remoteFileCopier.Copy(op.From, op.To, cancellationToken); if (copyResult.IsError) From 9bb7395822d2edd1cc88f0ddf5ef36c0002347c4 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 5 Jan 2024 17:57:29 +0300 Subject: [PATCH 12/21] ClusterModifier: refactoring --- src/ClusterModifier/INodeDiskRemoteDirsFinder.cs | 2 +- src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs | 2 +- .../TestModeImplementations/CommonImplementation.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs index 6537815..7fb42aa 100644 --- a/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs @@ -8,6 +8,6 @@ namespace ClusterModifier; public interface INodeDiskRemoteDirsFinder { - Task>> FindRemoteDirByDiskByNode(ClusterConfiguration config, CancellationToken cancellationToken); + Task>> FindRemoteRootDirByDiskByNode(ClusterConfiguration config, CancellationToken cancellationToken); } diff --git a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs index 85d808f..f269968 100644 --- a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs @@ -17,7 +17,7 @@ public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) _args = args; } - public async Task>> FindRemoteDirByDiskByNode( + public async Task>> FindRemoteRootDirByDiskByNode( ClusterConfiguration config, CancellationToken cancellationToken ) diff --git a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs index 534aeb9..e7fa7b5 100644 --- a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs +++ b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs @@ -56,7 +56,7 @@ await _args.GetClusterConfigurationFromFile(_args.OldConfigPath, cancellationTok ).Unwrap(); } - public Task>> FindRemoteDirByDiskByNode( + public Task>> FindRemoteRootDirByDiskByNode( ClusterConfiguration config, CancellationToken cancellationToken ) From e4b9f79fd81c766c339a06c0864a6ce76fc31a43 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 5 Jan 2024 18:20:44 +0300 Subject: [PATCH 13/21] ClusterModifier: refactoring --- src/ClusterModifier/ClusterStateFinder.cs | 2 +- .../Implementations/NodeDiskRemoteDirsFinder.cs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index be2d17d..656d1e7 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -59,7 +59,7 @@ private async Task> GetRemoteDirFinder( CancellationToken cancellationToken ) { - var remoteDirByDiskByNode = await _nodeDiskRemoteDirsFinder.FindRemoteDirByDiskByNode( + var remoteDirByDiskByNode = await _nodeDiskRemoteDirsFinder.FindRemoteRootDirByDiskByNode( config, cancellationToken ); diff --git a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs index f269968..f096c20 100644 --- a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs @@ -17,7 +17,9 @@ public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) _args = args; } - public async Task>> FindRemoteRootDirByDiskByNode( + public async Task< + Dictionary> + > FindRemoteRootDirByDiskByNode( ClusterConfiguration config, CancellationToken cancellationToken ) @@ -26,7 +28,12 @@ CancellationToken cancellationToken foreach (var node in config.Nodes) { var remoteDirByDisk = await GetRemoteDirByDisk(node, cancellationToken); - result.Add(node.Name, remoteDirByDisk); + var rootDir = await _args.GetRootDir(node, cancellationToken); + var remoteRootDirByDisk = remoteDirByDisk.ToDictionary( + kv => kv.Key, + kv => kv.Value.GetSubdir(rootDir) + ); + result.Add(node.Name, remoteRootDirByDisk); } return result; @@ -38,10 +45,6 @@ CancellationToken cancellationToken ) { var addr = await node.FindIPAddress(); - var rootDir = await _args.GetRootDir(node, cancellationToken); - return node.Disks.ToDictionary( - d => d.Name, - d => new RemoteDir(addr, Path.Combine(d.Path, rootDir)) - ); + return node.Disks.ToDictionary(d => d.Name, d => new RemoteDir(addr, d.Path)); } } From f5b5a24d6f155d27d433c8857677758e5c2ff042 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 5 Jan 2024 18:37:07 +0300 Subject: [PATCH 14/21] ClusterModifier: add alien dir finding --- src/ClusterModifier/ClusterStateFinder.cs | 24 ++++++++++++++---- .../INodeDiskRemoteDirsFinder.cs | 10 ++++++-- .../NodeDiskRemoteDirsFinder.cs | 22 +++++++++++++++- .../CommonImplementation.cs | 25 ++++++++++++++++++- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index 656d1e7..14f0390 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -29,7 +29,8 @@ public async Task Find(CancellationToken cancellationToken) var config = await _configurationsFinder.FindNewConfig(cancellationToken); var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); - return new ClusterState(vDiskInfo); + var alienDirs = await GetAlienDirs(oldConfig, cancellationToken); + return new ClusterState(vDiskInfo, alienDirs); } private async Task> GetVDiskInfo( @@ -41,8 +42,8 @@ CancellationToken cancellationToken var vDiskPairs = oldConfig .VDisks .Join(config.VDisks, vd => vd.Id, vd => vd.Id, (ovd, vd) => (ovd, vd)); - var findOldDir = await GetRemoteDirFinder(oldConfig, "old", cancellationToken); - var findNewDir = await GetRemoteDirFinder(config, "new", cancellationToken); + var findOldDir = await GetRootRemoteDirFinder(oldConfig, "old", cancellationToken); + var findNewDir = await GetRootRemoteDirFinder(config, "new", cancellationToken); var result = new List(); foreach (var (oldVDisk, vDisk) in vDiskPairs) { @@ -53,7 +54,20 @@ CancellationToken cancellationToken return result; } - private async Task> GetRemoteDirFinder( + private async Task> GetAlienDirs( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + var remoteAlienDirByDiskByNode = + await _nodeDiskRemoteDirsFinder.FindRemoteAlienDirByDiskByNode( + config, + cancellationToken + ); + return remoteAlienDirByDiskByNode.Values.SelectMany(d => d.Values).ToList(); + } + + private async Task> GetRootRemoteDirFinder( ClusterConfiguration config, string clusterConfigName, CancellationToken cancellationToken @@ -77,7 +91,7 @@ CancellationToken cancellationToken } } -public record struct ClusterState(List VDiskInfo); +public record struct ClusterState(List VDiskInfo, List AlienDirs); public record struct VDiskInfo( ClusterConfiguration.VDisk VDisk, diff --git a/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs index 7fb42aa..a5e1e4d 100644 --- a/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/INodeDiskRemoteDirsFinder.cs @@ -8,6 +8,12 @@ namespace ClusterModifier; public interface INodeDiskRemoteDirsFinder { - Task>> FindRemoteRootDirByDiskByNode(ClusterConfiguration config, CancellationToken cancellationToken); + Task>> FindRemoteRootDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ); + Task>> FindRemoteAlienDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ); } - diff --git a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs index f096c20..3da2c3e 100644 --- a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs @@ -1,5 +1,4 @@ using System.Collections.Generic; -using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -10,6 +9,7 @@ namespace ClusterModifier; public class NodeDiskRemoteDirsFinder : INodeDiskRemoteDirsFinder { + private const string AlienDir = "alien"; private readonly ClusterExpandArguments _args; public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) @@ -17,6 +17,26 @@ public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) _args = args; } + public async Task< + Dictionary> + > FindRemoteAlienDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + var result = new Dictionary>(); + foreach (var node in config.Nodes) + { + var remoteDirByDisk = await GetRemoteDirByDisk(node, cancellationToken); + result.Add(node.Name, remoteDirByDisk); + } + + return result.ToDictionary( + kv => kv.Key, + kv => kv.Value.ToDictionary(kv1 => kv1.Key, kv1 => kv1.Value.GetSubdir(AlienDir)) + ); + } + public async Task< Dictionary> > FindRemoteRootDirByDiskByNode( diff --git a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs index e7fa7b5..bd5d87f 100644 --- a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs +++ b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs @@ -56,6 +56,29 @@ await _args.GetClusterConfigurationFromFile(_args.OldConfigPath, cancellationTok ).Unwrap(); } + public Task>> FindRemoteAlienDirByDiskByNode( + ClusterConfiguration config, + CancellationToken cancellationToken + ) + { + return Task.FromResult( + config + .Nodes + .ToDictionary( + n => n.Name, + n => + n.Disks.ToDictionary( + d => d.Name, + d => + new RemoteDir( + System.Net.IPAddress.None, + $"/{n.Name}/{d.Name}/alien" + ) + ) + ) + ); + } + public Task>> FindRemoteRootDirByDiskByNode( ClusterConfiguration config, CancellationToken cancellationToken @@ -69,7 +92,7 @@ CancellationToken cancellationToken n => n.Disks.ToDictionary( d => d.Name, - d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}") + d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}/bob") ) ) ); From 60d7534826b5a7008b9b47e2a615c82c28efb76b Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 15:36:26 +0300 Subject: [PATCH 15/21] ClusterModifier: refactor remote dirs finding --- src/ClusterModifier/ClusterStateFinder.cs | 38 ++++++++++++----------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index 14f0390..f37ea8a 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -39,17 +39,18 @@ private async Task> GetVDiskInfo( CancellationToken cancellationToken ) { - var vDiskPairs = oldConfig - .VDisks - .Join(config.VDisks, vd => vd.Id, vd => vd.Id, (ovd, vd) => (ovd, vd)); - var findOldDir = await GetRootRemoteDirFinder(oldConfig, "old", cancellationToken); - var findNewDir = await GetRootRemoteDirFinder(config, "new", cancellationToken); + var vDiskPairs = oldConfig.VDisks.Join( + config.VDisks, + vd => vd.Id, + vd => vd.Id, + (ovd, vd) => (ovd, vd) + ); + var findOldDirs = await GetVDiskRemoteDirsFinder(oldConfig, "old", cancellationToken); + var findNewDirs = await GetVDiskRemoteDirsFinder(config, "new", cancellationToken); var result = new List(); foreach (var (oldVDisk, vDisk) in vDiskPairs) { - var oldDirs = oldVDisk.Replicas.Select(r => findOldDir(r.Node, r.Disk, oldVDisk.Id)); - var newDirs = vDisk.Replicas.Select(r => findNewDir(r.Node, r.Disk, vDisk.Id)); - result.Add(new VDiskInfo(vDisk, oldDirs.ToArray(), newDirs.ToArray())); + result.Add(new VDiskInfo(vDisk, findOldDirs(oldVDisk), findNewDirs(vDisk))); } return result; } @@ -67,27 +68,28 @@ await _nodeDiskRemoteDirsFinder.FindRemoteAlienDirByDiskByNode( return remoteAlienDirByDiskByNode.Values.SelectMany(d => d.Values).ToList(); } - private async Task> GetRootRemoteDirFinder( + private async Task> GetVDiskRemoteDirsFinder( ClusterConfiguration config, string clusterConfigName, CancellationToken cancellationToken ) { - var remoteDirByDiskByNode = await _nodeDiskRemoteDirsFinder.FindRemoteRootDirByDiskByNode( + var remoteDirs = await _nodeDiskRemoteDirsFinder.FindRemoteRootDirByDiskByNode( config, cancellationToken ); - return (node, disk, vDiskId) => + RemoteDir FindReplicaRemoteDir( + ClusterConfiguration.VDisk v, + ClusterConfiguration.VDisk.Replica r + ) { - if ( - remoteDirByDiskByNode.TryGetValue(node, out var d) - && d.TryGetValue(disk, out var rd) - ) - return rd.GetSubdir(vDiskId.ToString()); + if (remoteDirs.TryGetValue(r.Node, out var d) && d.TryGetValue(r.Disk, out var rd)) + return rd.GetSubdir(v.Id.ToString()); throw new ClusterStateException( - $"Disk {disk} not found on node {node} in cluster config \"{clusterConfigName}\"" + $"Disk {r.Disk} not found on node {r.Node} in cluster config \"{clusterConfigName}\"" ); - }; + } + return (vDisk) => vDisk.Replicas.Select(r => FindReplicaRemoteDir(vDisk, r)).ToArray(); } } From d7317b9d1dd4d1bb82cae367aadc9f6b56d62398 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 15:39:55 +0300 Subject: [PATCH 16/21] ClusterModifier: refactoring --- src/ClusterModifier/ClusterStateFinder.cs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index f37ea8a..4117438 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -39,20 +39,16 @@ private async Task> GetVDiskInfo( CancellationToken cancellationToken ) { - var vDiskPairs = oldConfig.VDisks.Join( - config.VDisks, - vd => vd.Id, - vd => vd.Id, - (ovd, vd) => (ovd, vd) - ); var findOldDirs = await GetVDiskRemoteDirsFinder(oldConfig, "old", cancellationToken); var findNewDirs = await GetVDiskRemoteDirsFinder(config, "new", cancellationToken); - var result = new List(); - foreach (var (oldVDisk, vDisk) in vDiskPairs) - { - result.Add(new VDiskInfo(vDisk, findOldDirs(oldVDisk), findNewDirs(vDisk))); - } - return result; + return oldConfig + .VDisks.Join( + config.VDisks, + vd => vd.Id, + vd => vd.Id, + (ovd, vd) => new VDiskInfo(vd, findOldDirs(ovd), findNewDirs(vd)) + ) + .ToList(); } private async Task> GetAlienDirs( From 34194523e09028613713f1b36088e67f33e3d4a0 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 16:42:04 +0300 Subject: [PATCH 17/21] ClusterModifier: add aliens check --- src/ClusterModifier/ClusterExpandArguments.cs | 3 + src/ClusterModifier/ClusterStateAlterer.cs | 1 + src/ClusterModifier/ClusterStateFinder.cs | 17 ++++- src/ClusterModifier/IValidator.cs | 9 +++ .../Implementations/Validator.cs | 68 +++++++++++++++++++ src/ClusterModifier/Program.cs | 6 +- .../CommonImplementation.cs | 48 +++++++------ .../FilesFinding/FilesFinder.cs | 36 +++++++++- src/RemoteFileCopy/IRemoteFileCopier.cs | 2 + .../LocalOptimizedRemoteFileCopier.cs | 27 ++++++++ src/RemoteFileCopy/RsyncRemoteFileCopier.cs | 6 +- 11 files changed, 190 insertions(+), 33 deletions(-) create mode 100644 src/ClusterModifier/IValidator.cs create mode 100644 src/ClusterModifier/Implementations/Validator.cs diff --git a/src/ClusterModifier/ClusterExpandArguments.cs b/src/ClusterModifier/ClusterExpandArguments.cs index be33391..bc400aa 100644 --- a/src/ClusterModifier/ClusterExpandArguments.cs +++ b/src/ClusterModifier/ClusterExpandArguments.cs @@ -33,6 +33,9 @@ public class ClusterExpandArguments : CommonWithSshArguments [Option("copy-parallel-degree", HelpText = "Number of simultaneous copy processes", Default = 1)] public int CopyParallelDegree { get; set; } + [Option("skip-alien-presence-check", HelpText = "Do not check for alien existence before cluster expansion", Default = false)] + public bool SkipAlienPresenceCheck { get; set; } + public async ValueTask GetRootDir( ClusterConfiguration.Node node, CancellationToken cancellationToken = default diff --git a/src/ClusterModifier/ClusterStateAlterer.cs b/src/ClusterModifier/ClusterStateAlterer.cs index 9f4b260..612c016 100644 --- a/src/ClusterModifier/ClusterStateAlterer.cs +++ b/src/ClusterModifier/ClusterStateAlterer.cs @@ -16,6 +16,7 @@ public ClusterStateAlterer( ClusterExpandArguments args, ICopier copier, IRemover remover, + IValidator validator, ILogger logger ) { diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index 4117438..2d4bf9f 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -13,24 +13,37 @@ public class ClusterStateFinder { private readonly INodeDiskRemoteDirsFinder _nodeDiskRemoteDirsFinder; private readonly IConfigurationsFinder _configurationsFinder; + private readonly IValidator _validator; public ClusterStateFinder( INodeDiskRemoteDirsFinder nodeDiskRemoteDirsFinder, - IConfigurationsFinder configurationsFinder + IConfigurationsFinder configurationsFinder, + IValidator validator ) { _nodeDiskRemoteDirsFinder = nodeDiskRemoteDirsFinder; _configurationsFinder = configurationsFinder; + _validator = validator; } public async Task Find(CancellationToken cancellationToken) + { + var state = await GetState(cancellationToken); + + await _validator.Validate(state, cancellationToken); + + return state; + } + + private async Task GetState(CancellationToken cancellationToken) { var oldConfig = await _configurationsFinder.FindOldConfig(cancellationToken); var config = await _configurationsFinder.FindNewConfig(cancellationToken); var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); var alienDirs = await GetAlienDirs(oldConfig, cancellationToken); - return new ClusterState(vDiskInfo, alienDirs); + var state = new ClusterState(vDiskInfo, alienDirs); + return state; } private async Task> GetVDiskInfo( diff --git a/src/ClusterModifier/IValidator.cs b/src/ClusterModifier/IValidator.cs new file mode 100644 index 0000000..a486ea7 --- /dev/null +++ b/src/ClusterModifier/IValidator.cs @@ -0,0 +1,9 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClusterModifier; + +public interface IValidator +{ + Task Validate(ClusterState clusterState, CancellationToken cancellationToken); +} diff --git a/src/ClusterModifier/Implementations/Validator.cs b/src/ClusterModifier/Implementations/Validator.cs new file mode 100644 index 0000000..83ca464 --- /dev/null +++ b/src/ClusterModifier/Implementations/Validator.cs @@ -0,0 +1,68 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BobToolsCli.Exceptions; +using Microsoft.Extensions.Logging; +using RemoteFileCopy; +using RemoteFileCopy.Entities; + +namespace ClusterModifier; + +public class Validator : IValidator +{ + private readonly ClusterExpandArguments _args; + private readonly IRemoteFileCopier _remoteFileCopier; + private readonly ILogger _logger; + + public Validator( + ClusterExpandArguments args, + IRemoteFileCopier remoteFileCopier, + ILogger logger + ) + { + _args = args; + _remoteFileCopier = remoteFileCopier; + _logger = logger; + } + + public async Task Validate(ClusterState clusterState, CancellationToken cancellationToken) + { + if (!_args.SkipAlienPresenceCheck) + { + var fileContainingAlienDirs = await FindFileContainingDirs( + clusterState.AlienDirs, + cancellationToken + ); + if (fileContainingAlienDirs.Count > 0) + { + foreach (var dir in fileContainingAlienDirs) + { + _logger.LogError("Alien dir contain files: {Dir}", dir); + } + throw new ClusterStateException( + $"{fileContainingAlienDirs.Count} alien dirs contain files" + ); + } + } + } + + private async Task> FindFileContainingDirs( + List alienDirs, + CancellationToken cancellationToken + ) + { + var result = new List(); + foreach (var dir in alienDirs) + { + if ( + await _remoteFileCopier.DirContainsFiles( + dir, + recursive: true, + cancellationToken: cancellationToken + ) + ) + result.Add(dir); + } + return result; + } +} diff --git a/src/ClusterModifier/Program.cs b/src/ClusterModifier/Program.cs index b21dd17..4ce8810 100644 --- a/src/ClusterModifier/Program.cs +++ b/src/ClusterModifier/Program.cs @@ -33,7 +33,8 @@ CancellationToken cancellationToken >() .AddTransient() .AddTransient() - .AddTransient(); + .AddTransient() + .AddTransient(); } else { @@ -41,7 +42,8 @@ CancellationToken cancellationToken .AddTransient() .AddTransient() .AddTransient() - .AddTransient(); + .AddTransient() + .AddTransient(); } services.AddRemoteFileCopy(arguments.SshConfiguration, arguments.FilesFinderConfiguration); using var provider = services.BuildServiceProvider(); diff --git a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs index bd5d87f..0730268 100644 --- a/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs +++ b/src/ClusterModifier/TestModeImplementations/CommonImplementation.cs @@ -13,7 +13,8 @@ public class CommonImplementation : IConfigurationsFinder, INodeDiskRemoteDirsFinder, ICopier, - IRemover + IRemover, + IValidator { private readonly ClusterExpandArguments _args; private readonly ILogger _logger; @@ -62,20 +63,14 @@ CancellationToken cancellationToken ) { return Task.FromResult( - config - .Nodes - .ToDictionary( - n => n.Name, - n => - n.Disks.ToDictionary( - d => d.Name, - d => - new RemoteDir( - System.Net.IPAddress.None, - $"/{n.Name}/{d.Name}/alien" - ) - ) - ) + config.Nodes.ToDictionary( + n => n.Name, + n => + n.Disks.ToDictionary( + d => d.Name, + d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}/alien") + ) + ) ); } @@ -85,16 +80,14 @@ CancellationToken cancellationToken ) { return Task.FromResult( - config - .Nodes - .ToDictionary( - n => n.Name, - n => - n.Disks.ToDictionary( - d => d.Name, - d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}/bob") - ) - ) + config.Nodes.ToDictionary( + n => n.Name, + n => + n.Disks.ToDictionary( + d => d.Name, + d => new RemoteDir(System.Net.IPAddress.None, $"/{n.Name}/{d.Name}/bob") + ) + ) ); } @@ -118,4 +111,9 @@ CancellationToken cancellationToken _logger.LogInformation("Remove dir {Dir} without any confirmation", dir); return Task.CompletedTask; } + + public Task Validate(ClusterState clusterState, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } } diff --git a/src/RemoteFileCopy/FilesFinding/FilesFinder.cs b/src/RemoteFileCopy/FilesFinding/FilesFinder.cs index 5aae264..80d4a42 100644 --- a/src/RemoteFileCopy/FilesFinding/FilesFinder.cs +++ b/src/RemoteFileCopy/FilesFinding/FilesFinder.cs @@ -1,5 +1,6 @@ -using System; +using System; using System.Collections.Generic; +using System.Linq; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -31,7 +32,7 @@ internal async Task> FindFiles(RemoteDir dir, CancellationToken cancellationToken = default) { var function = GetHashFunction(); - var sshResult = await _sshWrapper.InvokeSshProcess(dir.Address, $"bash << {GetBashHereDoc(dir.Path, function)}", cancellationToken); + var sshResult = await _sshWrapper.InvokeSshProcess(dir.Address, $"bash << {GetEnumerateFilesBashHereDoc(dir.Path, function)}", cancellationToken); if (sshResult.IsError) { @@ -53,6 +54,22 @@ internal async Task> FindFiles(RemoteDir dir, return result; } + internal async Task FileExists(RemoteDir dir, bool recurse, CancellationToken cancellationToken) + { + var doc = GetFileExistsBashHereDoc(dir.Path, recurse); + var sshResult = await _sshWrapper.InvokeSshProcess(dir.Address, $"bash << {doc}", cancellationToken); + + if (sshResult.IsError) + { + _logger.LogWarning("Failed to check files existence in {dir}", dir); + _logger.LogDebug("Finder stderr: {output}", string.Join(Environment.NewLine, sshResult.StdErr)); + _logger.LogDebug("Finder stdout: {output}", string.Join(Environment.NewLine, sshResult.StdOut)); + throw new CommandLineFailureException("find"); + } + + return sshResult.StdOut.Any(); + } + private string GetHashFunction() { return _filesFinderConfiguration.HashType switch @@ -63,7 +80,7 @@ private string GetHashFunction() }; } - internal static string GetBashHereDoc(string path, string hashFunction) + internal static string GetEnumerateFilesBashHereDoc(string path, string hashFunction) { return $@"'EOF' if [ -d {path} ] @@ -74,6 +91,19 @@ internal static string GetBashHereDoc(string path, string hashFunction) echo f$f l$size c$hash done fi +EOF"; + } + + internal static string GetFileExistsBashHereDoc(string path, bool recurse) + { + var additionalArgs= ""; + if (!recurse) + additionalArgs = " -maxdepth 1"; + return $@"'EOF' +if [ -d {path} ] +then + echo $(find {path} -type f{additionalArgs} 2>/dev/null | head -n 1) +fi EOF"; } } diff --git a/src/RemoteFileCopy/IRemoteFileCopier.cs b/src/RemoteFileCopy/IRemoteFileCopier.cs index 7b4d4b4..67b54c7 100644 --- a/src/RemoteFileCopy/IRemoteFileCopier.cs +++ b/src/RemoteFileCopy/IRemoteFileCopier.cs @@ -17,5 +17,7 @@ public interface IRemoteFileCopier Task RemoveAlreadyMovedFiles(RemoteDir from, RemoteDir to, CancellationToken cancellationToken = default); Task SourceCopiedToDest(RemoteDir from, RemoteDir to, CancellationToken cancellationToken = default); + + Task DirContainsFiles(RemoteDir dir, bool recursive = true, CancellationToken cancellationToken = default); } } diff --git a/src/RemoteFileCopy/LocalOptimizedRemoteFileCopier.cs b/src/RemoteFileCopy/LocalOptimizedRemoteFileCopier.cs index 0ead95e..9500b19 100644 --- a/src/RemoteFileCopy/LocalOptimizedRemoteFileCopier.cs +++ b/src/RemoteFileCopy/LocalOptimizedRemoteFileCopier.cs @@ -89,6 +89,17 @@ public async Task RemoveAlreadyMovedFiles(RemoteDir from, RemoteDir to, Can return await _remoteFileCopier.RemoveAlreadyMovedFiles(from, to, cancellationToken); } + public async Task DirContainsFiles(RemoteDir dir, bool recursive = true, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (TryGetLocalPath(dir, out var path)) + { + return DirContainsFiles(path, recursive, cancellationToken); + } + else + return await _remoteFileCopier.DirContainsFiles(dir, recursive, cancellationToken); + } + public async Task SourceCopiedToDest(RemoteDir from, RemoteDir to, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); @@ -151,6 +162,22 @@ private int RemoveLocalAlreadyMovedFiles(string fromPath, string toPath, return count; } + private bool DirContainsFiles(string path, bool recursive, CancellationToken cancellationToken) + { + if (!Directory.Exists(path)) + return false; + + if (Directory.EnumerateFiles(path).Any()) + return true; + + if (recursive) + { + return Directory.EnumerateDirectories(path) + .Any(d => DirContainsFiles(d, recursive, cancellationToken)); + } + return false; + } + private bool TryGetLocalPath(RemoteDir dir, out string path) { path = dir.Path; diff --git a/src/RemoteFileCopy/RsyncRemoteFileCopier.cs b/src/RemoteFileCopy/RsyncRemoteFileCopier.cs index 3d63323..82047a1 100644 --- a/src/RemoteFileCopy/RsyncRemoteFileCopier.cs +++ b/src/RemoteFileCopy/RsyncRemoteFileCopier.cs @@ -105,6 +105,11 @@ public async Task SourceCopiedToDest(RemoteDir from, RemoteDir to, Cancell return equal.IsSubsetOf(dstFiles.Select(f => (to, f))); } + public async Task DirContainsFiles(RemoteDir dir, bool recursive = true, CancellationToken cancellationToken = default) + { + return await _filesFinder.FileExists(dir, recursive, cancellationToken); + } + internal async Task RemoveFiles(IEnumerable fileInfos, CancellationToken cancellationToken = default) { var error = false; @@ -145,7 +150,6 @@ async Task InvokeSsh(string command) return !error; } - private class FileInfoComparer : IEqualityComparer<(RemoteDir dir, RemoteFileInfo file)> { public bool Equals((RemoteDir dir, RemoteFileInfo file) x, (RemoteDir dir, RemoteFileInfo file) y) From 749024280454ec3024098c78999d1e1c3de00726 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 16:59:59 +0300 Subject: [PATCH 18/21] ClusterModifier: add check for vdisks count --- src/ClusterModifier/ClusterStateFinder.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/ClusterModifier/ClusterStateFinder.cs b/src/ClusterModifier/ClusterStateFinder.cs index 2d4bf9f..9e2e8d2 100644 --- a/src/ClusterModifier/ClusterStateFinder.cs +++ b/src/ClusterModifier/ClusterStateFinder.cs @@ -39,6 +39,7 @@ private async Task GetState(CancellationToken cancellationToken) { var oldConfig = await _configurationsFinder.FindOldConfig(cancellationToken); var config = await _configurationsFinder.FindNewConfig(cancellationToken); + ValidateConfigs(oldConfig, config); var vDiskInfo = await GetVDiskInfo(oldConfig, config, cancellationToken); var alienDirs = await GetAlienDirs(oldConfig, cancellationToken); @@ -46,6 +47,17 @@ private async Task GetState(CancellationToken cancellationToken) return state; } + private static void ValidateConfigs(ClusterConfiguration oldConfig, ClusterConfiguration config) + { + var oldVDiskIds = oldConfig.VDisks.Select(vd => vd.Id).ToHashSet(); + var newVDiskIds = config.VDisks.Select(vd => vd.Id).ToHashSet(); + oldVDiskIds.ExceptWith(newVDiskIds); + if (oldVDiskIds.Count > 0) + throw new ConfigurationException( + $"Cluster shrink detection, removed vdisks {string.Join(", ", oldVDiskIds)}" + ); + } + private async Task> GetVDiskInfo( ClusterConfiguration oldConfig, ClusterConfiguration config, From ff88f23fd180b3eb691eb51edd7b98d5de6a3b9b Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 17:01:51 +0300 Subject: [PATCH 19/21] ClusterModifier: fix typos --- src/ClusterModifier/ClusterExpandArguments.cs | 2 +- src/ClusterModifier/ClusterExpander.cs | 5 ----- src/ClusterModifier/ClusterStateAlterer.cs | 5 ++++- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/ClusterModifier/ClusterExpandArguments.cs b/src/ClusterModifier/ClusterExpandArguments.cs index bc400aa..ee13f0d 100644 --- a/src/ClusterModifier/ClusterExpandArguments.cs +++ b/src/ClusterModifier/ClusterExpandArguments.cs @@ -18,7 +18,7 @@ public class ClusterExpandArguments : CommonWithSshArguments [Option("dry-run", Required = false, HelpText = "Do not copy anything")] public bool DryRun { get; set; } = false; - [Option("dry-run", Required = false, HelpText = "Special testing run with no interactions with cluster")] + [Option("test-run", Required = false, HelpText = "Special testing run with no interactions with cluster")] public bool TestRun { get; set; } = false; [Option("remove-unused-replicas", Required = false, HelpText = "Remove files in unused replicas")] diff --git a/src/ClusterModifier/ClusterExpander.cs b/src/ClusterModifier/ClusterExpander.cs index fdf8526..957c597 100644 --- a/src/ClusterModifier/ClusterExpander.cs +++ b/src/ClusterModifier/ClusterExpander.cs @@ -1,6 +1,5 @@ using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; namespace ClusterModifier; @@ -9,10 +8,6 @@ public class ClusterExpander private readonly ClusterStateFinder _clusterStateFinder; private readonly WorkSpecificationFinder _workSpecificationFinder; private readonly ClusterStateAlterer _clusterStateAlterer; - private readonly ICopier _copier; - private readonly IRemover _remover; - private readonly ClusterExpandArguments _args; - private readonly ILogger _logger; public ClusterExpander( ClusterStateFinder clusterStateFinder, diff --git a/src/ClusterModifier/ClusterStateAlterer.cs b/src/ClusterModifier/ClusterStateAlterer.cs index 612c016..9d37950 100644 --- a/src/ClusterModifier/ClusterStateAlterer.cs +++ b/src/ClusterModifier/ClusterStateAlterer.cs @@ -73,7 +73,10 @@ CancellationToken cancellationToken op.DirToDelete ); foreach (var dir in workSpecification.UnconfirmedDeleteDirs) - _logger.LogInformation("Expected removing directory {Dir} without checking copies"); + _logger.LogInformation( + "Expected removing directory {Dir} without checking copies", + dir + ); } else { From ae87879fbf6e869ece469a3245a64d51144d394b Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 8 Jan 2024 17:17:46 +0300 Subject: [PATCH 20/21] ClusterModifier: remove unnecessary arg --- src/ClusterModifier/ClusterStateAlterer.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ClusterModifier/ClusterStateAlterer.cs b/src/ClusterModifier/ClusterStateAlterer.cs index 9d37950..f91f1f5 100644 --- a/src/ClusterModifier/ClusterStateAlterer.cs +++ b/src/ClusterModifier/ClusterStateAlterer.cs @@ -16,7 +16,6 @@ public ClusterStateAlterer( ClusterExpandArguments args, ICopier copier, IRemover remover, - IValidator validator, ILogger logger ) { From c22aee1adfc9a6458cd7d7b8870f20b7011c81a0 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 19 Jan 2024 12:23:37 +0300 Subject: [PATCH 21/21] ClusterModifier: fetch alien dirs from client --- .../NodeDiskRemoteDirsFinder.cs | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs index 3da2c3e..0777ade 100644 --- a/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs +++ b/src/ClusterModifier/Implementations/NodeDiskRemoteDirsFinder.cs @@ -3,13 +3,14 @@ using System.Threading; using System.Threading.Tasks; using BobApi.BobEntities; +using BobToolsCli.Exceptions; +using BobToolsCli.Helpers; using RemoteFileCopy.Entities; namespace ClusterModifier; public class NodeDiskRemoteDirsFinder : INodeDiskRemoteDirsFinder { - private const string AlienDir = "alien"; private readonly ClusterExpandArguments _args; public NodeDiskRemoteDirsFinder(ClusterExpandArguments args) @@ -25,16 +26,37 @@ CancellationToken cancellationToken ) { var result = new Dictionary>(); + var prov = _args.GetBobApiClientProvider(); foreach (var node in config.Nodes) { - var remoteDirByDisk = await GetRemoteDirByDisk(node, cancellationToken); - result.Add(node.Name, remoteDirByDisk); + var dir = await FindAlienDir(prov, node, cancellationToken); + // There is a possibility for multiple disks in future + result.Add(node.Name, new Dictionary { ["alien"] = dir }); + } + + return result; + } + + private static async Task FindAlienDir( + BobApiClientProvider prov, + ClusterConfiguration.Node node, + CancellationToken cancellationToken + ) + { + var client = prov.GetClient(node); + var alienDir = await client.GetAlienDirectory(cancellationToken); + RemoteDir dir; + if (alienDir.IsOk(out var d, out var err)) + { + var addr = await node.FindIPAddress(); + dir = new RemoteDir(addr, d.Path); + } + else + { + throw new ClusterStateException($"Failed to get alien dir for node {node.Name}: {err}"); } - return result.ToDictionary( - kv => kv.Key, - kv => kv.Value.ToDictionary(kv1 => kv1.Key, kv1 => kv1.Value.GetSubdir(AlienDir)) - ); + return dir; } public async Task<