diff --git a/.vscode/launch.json b/.vscode/launch.json index 01c58dd..a2c7d81 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "preLaunchTask": "build", // If you have changed target frameworks, make sure to update the program path. - "program": "${workspaceFolder}/bin/Debug/netcoreapp2.1/SmartBulkCopy.dll", + "program": "${workspaceFolder}/bin/Debug/netcoreapp3.1/SmartBulkCopy.dll", "args": [], "cwd": "${workspaceFolder}", // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console diff --git a/CopyInfo.cs b/CopyInfo.cs index 7aa4f38..717d169 100644 --- a/CopyInfo.cs +++ b/CopyInfo.cs @@ -14,43 +14,6 @@ namespace SmartBulkCopy { - - public class TableOrderInfo - { - public List SortColumns = new List(); - - public bool IsFound => SortColumns.Count > 0; - - public string GetPartitionOrderBy() - { - var orderList = from c in SortColumns - where c.OrdinalPosition == 0 - orderby c.OrdinalPosition - select c.ColumnName + (c.IsDescending == true ? " DESC" : ""); - - return string.Join(",", orderList); - } - - public string GetOrderBy(bool excludePartitionColumn = true) - { - int op = 0; - if (excludePartitionColumn == true) op = -1; - - var orderList = from c in SortColumns - where c.OrdinalPosition != op - orderby c.OrdinalPosition - select c.ColumnName + (c.IsDescending == true ? " DESC" : ""); - - return string.Join(",", orderList); - } - } - - public class SortColumn { - public string ColumnName; - public int OrdinalPosition; - public bool IsDescending; - } - enum OrderHintType { None, @@ -60,11 +23,14 @@ enum OrderHintType abstract class CopyInfo { - public string TableName; - public List Columns = new List(); - public TableOrderInfo OrderInfo = new TableOrderInfo(); + public TableInfo SourceTableInfo = new UnknownTableInfo(); + public TableInfo DestinationTableInfo = new UnknownTableInfo(); public OrderHintType OrderHintType = OrderHintType.None; - public int PartitionNumber; + public int PartitionNumber; + + public string TableName => SourceTableInfo?.TableName; + public List Columns => SourceTableInfo.Columns; + public abstract string GetPredicate(); public string GetSelectList() { @@ -72,7 +38,7 @@ public string GetSelectList() } public string GetOrderBy() { - return OrderInfo.GetOrderBy(excludePartitionColumn:true); + return SourceTableInfo.PrimaryIndex.GetOrderBy(excludePartitionColumn:true); } } diff --git a/Dockerfile b/Dockerfile index 484b12e..09eb701 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM mcr.microsoft.com/dotnet/core/sdk:2.1 AS build-env +FROM mcr.microsoft.com/dotnet/core/sdk:3.1 AS build-env WORKDIR /app # Copy csproj and restore as distinct layers @@ -10,7 +10,7 @@ COPY . ./ RUN dotnet publish -c Release -o out # Build runtime image -FROM mcr.microsoft.com/dotnet/core/runtime:2.1 +FROM mcr.microsoft.com/dotnet/core/runtime:3.1 WORKDIR /app COPY --from=build-env /app/out . ENTRYPOINT ["dotnet", "SmartBulkCopy.dll", "config/smartbulkcopy.config"] diff --git a/README.md b/README.md index 0ed6163..00cf512 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ It can be used to efficiently and quickly move data from two instances of SQL Se ## How it works -Smart Bulk Copy usese [Bulk Copy API](https://docs.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqlbulkcopy) with parallel tasks. A source table is split in partitions, and each partition is copied in parallel with others, up to a defined maxium, in order to use all the available bandwidth and all the cloud or server resources available to minimize the load times. +Smart Bulk Copy uses [Bulk Copy API](https://docs.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqlbulkcopy) with parallel tasks. A source table is split in partitions, and each partition is copied in parallel with others, up to a defined maxium, in order to use all the available bandwidth and all the cloud or server resources available to minimize the load times. ### Partitioned Source Tables @@ -16,7 +16,7 @@ When a source table is partitioned, it uses the physical partitions to execute s SELECT * FROM WHERE $partition.() = ``` -in parallel and to load, always in parallel, data into the destination table. `TABLOCK` options is used - when possible - on the table to allow fully parallelizable bulk inserts. +Queries are executed in parallel to load, always in parallel, data into the destination table. `TABLOCK` options is used - when possible - on the table to allow fully parallelizable bulk inserts. `ORDER` option is also used when possibile to minimize the sort operations on the destination table, when insert into a table with an existing clustered rowstore index. ### Non-Partitioned Source Tables @@ -24,7 +24,7 @@ If a source table is not partitioned, then Smart Bulk Copy will use the `%%PhysL [Where is a record really located?](https://techcommunity.microsoft.com/t5/Premier-Field-Engineering/Where-is-a-record-really-located/ba-p/370972) -If the configuration file specify a value greater than 1 for `logical-partitions` the following query will be used to read the logical partition in parallel: +If the configuration file specifies a value greater than 1 for `logical-partitions` the following query will be used to read the logical partition in parallel: ```sql SELECT * FROM WHERE ABS(CAST(%%PhysLoc%% AS BIGINT)) % = @@ -36,9 +36,17 @@ SELECT * FROM WHERE ABS(CAST(%%PhysLoc%% AS BIGINT)) % Copy(List tablesToCopy) _tablesToCopy.AddRange(internalTablesToCopy); _logger.Info("Analyzing tables..."); + var ticSource = new TablesInfoCollector(_config.SourceConnectionString, internalTablesToCopy, _logger); + var ticDestination = new TablesInfoCollector(_config.DestinationConnectionString, internalTablesToCopy, _logger); + + var ti1 = ticSource.CollectTablesInfoAsync(); + var ti2 = ticDestination.CollectTablesInfoAsync(); + + await Task.WhenAll(ti1, ti2); + + var tiSource = await ti1; + var tiDestination = await ti2; + var copyInfo = new List(); foreach (var t in internalTablesToCopy) { - // By default try to use partitioned load - bool usePartitioning = true; + bool usePartitioning = false; + OrderHintType orderHintType = OrderHintType.None; + + var sourceTable = tiSource.Find(p => p.TableName == t); + var destinationTable = tiDestination.Find(p => p.TableName == t); // Check it tables exists - if (!await CheckTableExistence(_config.SourceConnectionString, t)) + if (sourceTable.Exists == false) { _logger.Error($"Table {t} does not exists on source."); return 1; } - if (!await CheckTableExistence(_config.DestinationConnectionString, t)) + if (destinationTable.Exists == false) { _logger.Error($"Table {t} does not exists on destination."); return 1; } - // Check if there is a compatibile clustered index on the target table - // so that ordered bulk load could be used - var ciSource = await GetTableOrderInfo(_config.SourceConnectionString, t); - var ciDestination = await GetTableOrderInfo(_config.DestinationConnectionString, t); - OrderHintType orderHintType = OrderHintType.None; - if (ciSource.GetOrderBy() == ciDestination.GetOrderBy()) + // To DO + // If destination table has secondary indexes then stop + + // Check if partitioned load is possible + if (sourceTable.PrimaryIndex.IsPartitioned && destinationTable.PrimaryIndex is Heap) { - orderHintType = OrderHintType.ClusteredIndex; - } - else if (ciSource.GetPartitionOrderBy() == ciDestination.GetPartitionOrderBy() && !string.IsNullOrEmpty(ciSource.GetPartitionOrderBy())) + _logger.Info($"{t} |> Source is partitioned and destination is heap. Parallel load enabled."); + _logger.Info($"{t} |> Partition By: {sourceTable.PrimaryIndex.GetPartitionBy()}"); + usePartitioning = true; + } else if (sourceTable.PrimaryIndex is Heap && destinationTable.PrimaryIndex is Heap) + { + _logger.Info($"{t} |> Source and destination are not partitioned and both are heaps. Parallel load enabled."); + usePartitioning = true; + } else if (sourceTable.PrimaryIndex.IsPartitioned == false && destinationTable.PrimaryIndex is Heap) { - orderHintType = OrderHintType.PartionKeyOnly; + _logger.Info($"{t} |> Source is not partitioned but destination is an heap. Parallel load enabled."); + usePartitioning = true; + } else if ( + (sourceTable.PrimaryIndex.IsPartitioned && destinationTable.PrimaryIndex.IsPartitioned) && + (sourceTable.PrimaryIndex.GetPartitionBy() == destinationTable.PrimaryIndex.GetPartitionBy()) && + (sourceTable.PrimaryIndex.GetOrderBy() == destinationTable.PrimaryIndex.GetOrderBy()) + ) { + _logger.Info($"{t} |> Source and destination tables have compatible partitioning logic. Parallel load enabled."); + _logger.Info($"{t} |> Partition By: {sourceTable.PrimaryIndex.GetPartitionBy()}"); + if (sourceTable.PrimaryIndex.GetOrderBy() != string.Empty) _logger.Info($"{t} |> Order By: {sourceTable.PrimaryIndex.GetOrderBy()}"); + usePartitioning = true; + } else if (destinationTable.PrimaryIndex is ColumnStoreClusteredIndex) + { + _logger.Info($"{t} |> Destination is a ColumnStore. Parallel load enabled."); + usePartitioning = true; + } + else { + _logger.Info($"{t} |> Source and destination tables cannot be loaded in parallel."); + usePartitioning = false; } - // Check if table is partitioned - var isSourcePartitioned = await CheckIfTableIsPartitioned(_config.SourceConnectionString, t); + // Check if ORDER hint can be used to avoid sorting data on the destination + if (sourceTable.PrimaryIndex is RowStoreClusteredIndex && destinationTable.PrimaryIndex is RowStoreClusteredIndex) + { + if (sourceTable.PrimaryIndex.GetOrderBy() == destinationTable.PrimaryIndex.GetOrderBy()) + { + _logger.Info($"{t} |> Source and destination clustered rowstore index have same ordering. Enabling ORDER hint."); + orderHintType = OrderHintType.ClusteredIndex; - // If table is not partitined and has a clustered index, - // logical partitioning CANNOT be used - if (orderHintType == OrderHintType.ClusteredIndex && isSourcePartitioned == false) + } + } + if (sourceTable.PrimaryIndex is Heap && destinationTable.PrimaryIndex is Heap) { - _logger.Info($"Table {t} has a clustered index but no physical partitions. Forcing 1 logical partition and ordered load."); - usePartitioning = false; - } + if (sourceTable.PrimaryIndex.IsPartitioned && destinationTable.PrimaryIndex.IsPartitioned) + { + _logger.Info($"{t} |> Source and destination are partitioned but not RowStores. Enabling ORDER hint on partition column."); + orderHintType = OrderHintType.PartionKeyOnly; + } + } + if (sourceTable.PrimaryIndex is ColumnStoreClusteredIndex && destinationTable.PrimaryIndex is ColumnStoreClusteredIndex) + { + if (sourceTable.PrimaryIndex.IsPartitioned && destinationTable.PrimaryIndex.IsPartitioned) + { + _logger.Info($"{t} |> Source and destination are partitioned but not RowStores. Enabling ORDER hint on partition column."); + orderHintType = OrderHintType.PartionKeyOnly; + } + } // Use partitions if that make sense var partitionType = "Unknown"; if (usePartitioning == true) { - var tableSize = GetTableSize(t); + var tableSize = sourceTable.Size; // Check if table is big enough to use partitions if (tableSize.RowCount > _config.BatchSize || tableSize.SizeInGB > 1) { // Create the Work Info data based on partition type - if (isSourcePartitioned) + if (sourceTable.PrimaryIndex.IsPartitioned) { - var cis = CreatePhysicalPartitionedTableCopyInfo(t); + var cis = CreatePhysicalPartitionedTableCopyInfo(sourceTable); cis.ForEach(ci => { - ci.OrderInfo = ciSource; + ci.SourceTableInfo = sourceTable; + ci.DestinationTableInfo = destinationTable; ci.OrderHintType = orderHintType; }); copyInfo.AddRange(cis); @@ -180,15 +233,20 @@ public async Task Copy(List tablesToCopy) } else { - var cis = CreateLogicalPartitionedTableCopyInfo(t, tableSize); - _logger.Info($"Table {t} is using logical partitioning: clustered index will be ignored, if present."); + var cis = CreateLogicalPartitionedTableCopyInfo(sourceTable); + cis.ForEach(ci => + { + ci.SourceTableInfo = sourceTable; + ci.DestinationTableInfo = destinationTable; + ci.OrderHintType = orderHintType; + }); copyInfo.AddRange(cis); partitionType = "Logical"; } } else { - _logger.Info($"Table {t} is small, partitioned copy will not be used."); + _logger.Info($"{t} |> Table is small, partitioned copy will not be used."); usePartitioning = false; } } @@ -196,20 +254,18 @@ public async Task Copy(List tablesToCopy) // Otherwise just copy the table, possibility using // and ordered bulk copy if (usePartitioning == false) - { - var columns = GetColumnsForBulkCopy(t); - var ci = new NoPartitionsCopyInfo() { TableName = t }; - ci.Columns.AddRange(columns); - if (orderHintType != OrderHintType.None) ci.OrderInfo = ciSource; + { + var ci = new NoPartitionsCopyInfo() { SourceTableInfo = sourceTable }; ci.OrderHintType = orderHintType; + ci.DestinationTableInfo = destinationTable; copyInfo.Add(ci); partitionType = "None"; } - _logger.Info($"Table {t} analysis result: usePartioning={usePartitioning}, partitionType={partitionType}, orderHintType={orderHintType}"); + _logger.Info($"{t} |> Analysis result: usePartioning={usePartitioning}, partitionType={partitionType}, orderHintType={orderHintType}"); } - _logger.Info("Enqueing work..."); + _logger.Info("Enqueueing work..."); copyInfo.ForEach(ci => _queue.Enqueue(ci)); _logger.Info($"{_queue.Count} items enqueued."); @@ -217,7 +273,7 @@ public async Task Copy(List tablesToCopy) { _logger.Info("Truncating destination tables..."); internalTablesToCopy.ForEach(t => TruncateDestinationTable(t)); - } + } _logger.Info($"Copying using {_config.MaxParallelTasks} parallel tasks."); var tasks = new List(); @@ -271,72 +327,7 @@ public async Task Copy(List tablesToCopy) } return result; - } - - private async Task GetTableOrderInfo(string connectionString, string tableName) - { - var sqsb = new SqlConnectionStringBuilder(connectionString); - - var result = new TableOrderInfo(); - - string sql = @" - select - c.name as ColumnName, - ic.key_ordinal as OrdinalPosition, - ic.is_descending_key as IsDescending, - ic.partition_ordinal - from - sys.indexes i - inner join - sys.index_columns ic on ic.index_id = i.index_id and ic.[object_id] = i.[object_id] - inner join - sys.columns c on ic.column_id = c.column_id and ic.[object_id] = c.[object_id] - where - i.[object_id] = object_id(@tableName) - and - i.[type] in (0,1) - order by - ic.key_ordinal, - ic.partition_ordinal - "; - - _logger.Debug($"Executing: {sql}, @tableName = {tableName}, @server = {sqsb.DataSource}"); - - var conn = new SqlConnection(connectionString); - var qr = await conn.QueryAsync(sql, new { @tableName = tableName }); - - result.SortColumns.AddRange(qr.ToList()); - - if (result.SortColumns.Count > 0) - { - _logger.Debug($"Detected Clustered Index on {tableName}@{sqsb.DataSource}: {result.GetOrderBy()}"); - } - - return result; - } - - private TableSize GetTableSize(string tableName) - { - string sql = @" - select - sum(row_count) as row_count, - (sum(used_page_count) * 8) / 1024. / 1024 as size_gb - from - sys.dm_db_partition_stats - where - [object_id] = object_id(@tableName) - and - index_id in (0, 1) - group by - [object_id] - "; - - _logger.Debug($"Executing: {sql}, @tableName = {tableName}"); - - var conn = new SqlConnection(_config.SourceConnectionString); - var qr = conn.QuerySingle(sql, new { @tableName = tableName }); - return new TableSize() { RowCount = (long)(qr.row_count), SizeInGB = (int)(qr.size_gb) }; - } + } private bool CheckDatabaseSnapshot() { @@ -394,31 +385,7 @@ group by } return result; - } - - private async Task CheckIfTableIsPartitioned(string connectionString, string tableName) - { - var sqsb = new SqlConnectionStringBuilder(connectionString); - - var conn = new SqlConnection(connectionString); - - string sql = @" - select - IsPartitioned = case when count(*) > 1 then 1 else 0 end - from - sys.dm_db_partition_stats - where - [object_id] = object_id(@tableName) - and - index_id in (0,1) - "; - - _logger.Debug($"Executing: {sql}, @tableName = {tableName}, @server = {sqsb.DataSource}"); - - var isPartitioned = await conn.ExecuteScalarAsync(sql, new { @tableName = tableName }); - - return (isPartitioned == 1); - } + } private void TruncateDestinationTable(string tableName) { @@ -427,33 +394,10 @@ private void TruncateDestinationTable(string tableName) destinationConnection.ExecuteScalar($"TRUNCATE TABLE {tableName}"); } - private List GetColumnsForBulkCopy(string tableName) + private List CreatePhysicalPartitionedTableCopyInfo(TableInfo ti) { - _logger.Debug($"Creating column list for {tableName}..."); - var conn = new SqlConnection(_config.SourceConnectionString); - - var sql = $@" - select - [name] - from - sys.columns - where - [object_id] = object_id(@tableName) - and - [is_computed] = 0 - and - [is_column_set] = 0 - "; + string tableName = ti.TableName; - _logger.Debug($"Executing: {sql}, @tableName = {tableName}"); - - var columns = conn.Query(sql, new { @tableName = tableName }); - - return columns.ToList(); - } - - private List CreatePhysicalPartitionedTableCopyInfo(string tableName) - { var copyInfo = new List(); var conn = new SqlConnection(_config.SourceConnectionString); @@ -473,7 +417,7 @@ index_id in (0,1) var partitionCount = (int)conn.ExecuteScalar(sql1, new { @tableName = tableName }); - _logger.Info($"Table {tableName} is partitioned. Bulk copy will be parallelized using {partitionCount} partition(s)."); + _logger.Info($"{tableName} |> Table is partitioned. Bulk copy will be parallelized using {partitionCount} partition(s)."); var sql2 = $@" select @@ -502,16 +446,13 @@ i.index_id in (0,1) var partitionInfo = conn.QuerySingle(sql2, new { @tableName = tableName }); - var columns = GetColumnsForBulkCopy(tableName); - foreach (var n in Enumerable.Range(1, partitionCount)) { var cp = new PhysicalPartitionCopyInfo(); cp.PartitionNumber = n; - cp.TableName = tableName; + cp.SourceTableInfo = ti; cp.PartitionColumn = partitionInfo.PartitionColumn; - cp.PartitionFunction = partitionInfo.PartitionFunction; - cp.Columns.AddRange(columns); + cp.PartitionFunction = partitionInfo.PartitionFunction; copyInfo.Add(cp); } @@ -519,15 +460,16 @@ i.index_id in (0,1) return copyInfo; } - private List CreateLogicalPartitionedTableCopyInfo(string tableName, TableSize tableSize) + private List CreateLogicalPartitionedTableCopyInfo(TableInfo ti) { - var copyInfo = new List(); + string tableName = ti.TableName; + TableSize tableSize = ti.Size; - var columns = GetColumnsForBulkCopy(tableName); + var copyInfo = new List(); long partitionCount = 1; - _logger.Debug($"Table {tableName}: RowCount={tableSize.RowCount}, SizeInGB={tableSize.SizeInGB}"); + _logger.Debug($"{tableName}: RowCount={tableSize.RowCount}, SizeInGB={tableSize.SizeInGB}"); switch (_config.LogicalPartitioningStrategy) { @@ -557,15 +499,14 @@ private List CreateLogicalPartitionedTableCopyInfo(string tableName, T var ps = (double)tableSize.SizeInGB / (double)partitionCount; var pc = (double)tableSize.RowCount / (double)partitionCount; - _logger.Info($"Table {tableName} is not partitioned. Bulk copy will be parallelized using {partitionCount} logical partitions (Size: {ps:0.00} GB, Rows: {pc:0.00})."); + _logger.Info($"{tableName} |> Source table is not partitioned. Bulk copy will be parallelized using {partitionCount} logical partitions (Logical partition size: {ps:0.00} GB, Rows: {pc:0.00})."); foreach (var n in Enumerable.Range(1, (int)partitionCount)) { var cp = new LogicalPartitionCopyInfo(); cp.PartitionNumber = n; - cp.TableName = tableName; + cp.SourceTableInfo = ti; cp.LogicalPartitionsCount = (int)partitionCount; - cp.Columns.AddRange(columns); copyInfo.Add(cp); } @@ -598,19 +539,22 @@ private void BulkCopy(int taskId) { whereClause = $" WHERE {predicate}"; }; - var orderBy = copyInfo.GetOrderBy(); - if (!string.IsNullOrEmpty(orderBy)) + var orderBy = ""; + if (copyInfo.OrderHintType != OrderHintType.None) { - orderBy = $" ORDER BY {orderBy}"; - }; + orderBy = copyInfo.GetOrderBy(); + if (!string.IsNullOrEmpty(orderBy)) + { + orderBy = $" ORDER BY {orderBy}"; + }; + } var sql = $"SELECT {copyInfo.GetSelectList()} FROM {copyInfo.TableName}{whereClause}{orderBy}"; var options = SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls; - // Tablock should be used only if using logical partitioning - // and NO indexes are detected/used - if (copyInfo.OrderHintType != OrderHintType.ClusteredIndex) + // TABLOCK can be used only if target is HEAP + if (copyInfo.DestinationTableInfo.PrimaryIndex is Heap) { options |= SqlBulkCopyOptions.TableLock; _logger.Debug($"Task {taskId}: Using TABLOCK"); @@ -661,26 +605,34 @@ private void BulkCopy(int taskId) { bulkCopy.ColumnMappings.Add(c, c); } - if (copyInfo.OrderInfo.IsFound) + + if (copyInfo.OrderHintType == OrderHintType.ClusteredIndex) { - _logger.Debug($"Task {taskId}: Adding OrderHints ({copyInfo.OrderHintType})."); - if (copyInfo.OrderHintType == OrderHintType.ClusteredIndex) + _logger.Debug($"Task {taskId}: Adding OrderHints ({copyInfo.OrderHintType})."); + var oc = copyInfo.SourceTableInfo.PrimaryIndex.Columns.OrderBy(c => c.OrdinalPosition); + foreach (var ii in oc) { - foreach (var ii in copyInfo.OrderInfo.SortColumns) - { - bulkCopy.ColumnOrderHints.Add(ii.ColumnName, ii.IsDescending ? SortOrder.Descending : SortOrder.Ascending); - } + bulkCopy.ColumnOrderHints.Add(ii.ColumnName, ii.IsDescending ? SortOrder.Descending : SortOrder.Ascending); } - if (copyInfo.OrderHintType == OrderHintType.PartionKeyOnly) + } + if (copyInfo.OrderHintType == OrderHintType.PartionKeyOnly) + { + _logger.Debug($"Task {taskId}: Adding OrderHints ({copyInfo.OrderHintType})."); + var oc = copyInfo.SourceTableInfo.PrimaryIndex.Columns.Where(c => c.PartitionOrdinal != 0); + foreach (var ii in oc) { - var oc = copyInfo.OrderInfo.SortColumns.Where(c => c.OrdinalPosition == 0); - foreach (var ii in oc) - { - bulkCopy.ColumnOrderHints.Add(ii.ColumnName, ii.IsDescending ? SortOrder.Descending : SortOrder.Ascending); - } + bulkCopy.ColumnOrderHints.Add(ii.ColumnName, ii.IsDescending ? SortOrder.Descending : SortOrder.Ascending); } } - bulkCopy.BatchSize = _config.BatchSize; + + if (copyInfo.DestinationTableInfo.PrimaryIndex is ColumnStoreClusteredIndex) + { + // Make sure Columnstore will have as few rowgroups as possible + bulkCopy.BatchSize = 1048576; + } else { + bulkCopy.BatchSize = _config.BatchSize; + } + bulkCopy.WriteToServer(sourceReader); attempts = int.MaxValue; taskTran.Commit(); @@ -891,36 +843,6 @@ END CATCH return result; } - private async Task CheckTableExistence(string connectionString, string tableName) - { - bool result = false; - var conn = new SqlConnection(connectionString); - try - { - await conn.QuerySingleAsync(@"select - [FullName] = QUOTENAME(s.[name]) + '.' + QUOTENAME(t.[name]) - from - sys.tables t - inner join - sys.schemas s on t.[schema_id] = s.[schema_id] - where - s.[name] = PARSENAME(@tableName, 2) - and - t.[name] = PARSENAME(@tableName, 1)", new { @tableName = tableName }); - result = true; - } - catch (InvalidOperationException) - { - result = false; - } - finally - { - conn.Close(); - } - - return result; - } - private List GetTablesToCopy(IEnumerable sourceList) { var conn = new SqlConnection(_config.SourceConnectionString); @@ -930,7 +852,7 @@ private List GetTablesToCopy(IEnumerable sourceList) { if (t.Contains("*")) { - _logger.Info("Getting list of tables to copy..."); + _logger.Info($"Wildcard found: '{t}'. Getting list of tables to copy..."); var tables = conn.Query(@" select [Name] = QUOTENAME(s.[name]) + '.' + QUOTENAME(t.[name]) @@ -951,7 +873,7 @@ inner join bool matches = Regex.IsMatch(tb.Name.Replace("[", "").Replace("]", ""), regExPattern); // TODO: Improve wildcard matching if (matches) { - _logger.Info($"Adding {tb.Name}..."); + _logger.Info($"Adding via wildcard {tb.Name}..."); internalTablesToCopy.Add(tb.Name); } } @@ -965,11 +887,5 @@ inner join return internalTablesToCopy; } - - private class TableSize - { - public long RowCount; - public int SizeInGB; - } } } \ No newline at end of file diff --git a/SmartBulkCopy.csproj b/SmartBulkCopy.csproj index a66487c..758fee4 100644 --- a/SmartBulkCopy.csproj +++ b/SmartBulkCopy.csproj @@ -3,14 +3,14 @@ SmartBulkCopy Exe - netcoreapp2.1 + netcoreapp3.1 latest - 1.6.1 + 1.7.1 Davide Mauri Smart Bulk Copy SmartBulkCopy - 1.6.1 - 1.6.1 + 1.7.1 + 1.7.1 en Efficentily copy database tables from one SQL Server/Azure SQL database to another Davide Mauri @@ -22,7 +22,7 @@ - + @@ -34,6 +34,9 @@ + + Always + Always diff --git a/SmartBulkCopyConfig.cs b/SmartBulkCopyConfig.cs index d4de7ef..fd71c8b 100644 --- a/SmartBulkCopyConfig.cs +++ b/SmartBulkCopyConfig.cs @@ -112,6 +112,13 @@ public static SmartBulkCopyConfiguration LoadFromConfigFile() public static SmartBulkCopyConfiguration LoadFromConfigFile(string configFile) { + // If config file is not found, automatically add .json extension + if (!File.Exists(Path.Combine(Directory.GetCurrentDirectory(), configFile))) + { + if (Path.GetExtension(configFile) != ".json") + configFile += ".json"; + } + var config = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile(configFile, optional: false, reloadOnChange: false) diff --git a/TableInfoCollector.cs b/TableInfoCollector.cs new file mode 100644 index 0000000..cc9eb7c --- /dev/null +++ b/TableInfoCollector.cs @@ -0,0 +1,451 @@ +using System; +using System.IO; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Data; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Configuration.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Data.SqlClient; +using Dapper; +using NLog; + +namespace SmartBulkCopy +{ + public class TableSize + { + public long RowCount = 0; + public int SizeInGB = 0; + } + + public class Column { + public string ColumnName; + } + + public class IndexColumn: Column + { + public int OrdinalPosition; + public bool IsDescending; + public int PartitionOrdinal; + } + + public abstract class Index + { + public List Columns = new List(); + + public virtual string GetOrderBy(bool excludePartitionColumn = true){ + return string.Empty; + } + public virtual string GetPartitionBy(){ + return string.Empty; + } + + public virtual bool IsPartitioned { get; } + } + + public class UnknownIndex: Index + { } + + public class Heap: Index + { + public override string GetPartitionBy() + { + if (this.Columns.Count() == 1) return this.Columns.First().ColumnName; + return string.Empty; + } + + public override string GetOrderBy(bool excludePartitionColumn = true) + { + if (excludePartitionColumn == true) return string.Empty; + + return GetPartitionBy(); + } + + public override bool IsPartitioned { + get { + return this.Columns.Count() == 1; + } + } + } + + public class RowStoreClusteredIndex: Index + { + public override string GetOrderBy(bool excludePartitionColumn = true) + { + int op = -1; + if (excludePartitionColumn == true) op = 1; + + var orderList = from c in Columns + where c.PartitionOrdinal != op + orderby c.OrdinalPosition + select c.ColumnName + (c.IsDescending == true ? " DESC" : ""); + + return string.Join(",", orderList); + } + + public override string GetPartitionBy() + { + var orderList = from c in Columns + where c.PartitionOrdinal != 0 + orderby c.PartitionOrdinal + select c.ColumnName; + + return string.Join(",", orderList); + } + + public override bool IsPartitioned { + get { + return Columns.Any(c => c.PartitionOrdinal != 0); + } + } + + } + + public class ColumnStoreClusteredIndex: Index + { + public override string GetPartitionBy() + { + if (this.Columns.Count() == 1) return this.Columns.First().ColumnName; + return string.Empty; + } + + public override string GetOrderBy(bool excludePartitionColumn = true) + { + if (excludePartitionColumn == true) return string.Empty; + + return GetPartitionBy(); + } + + public override bool IsPartitioned { + get { + return this.Columns.Count() == 1; + } + } + } + + public class TableInfo + { + public readonly string ServerName; + public readonly string DatabaseName; + public readonly string TableName; + public bool Exists; + public Index PrimaryIndex = new UnknownIndex(); + public List Columns = new List(); + public string TableLocation => $"{DatabaseName}.{TableName}@{ServerName}"; + public TableSize Size = new TableSize(); + + public TableInfo(string serverName, string databaseName, string tableName) + { + this.ServerName = serverName; + this.DatabaseName = databaseName; + this.TableName = tableName; + } + + public override string ToString() + { + return TableLocation; + } + } + + public class UnknownTableInfo: TableInfo + { + public UnknownTableInfo(): base("Unknown", "Unknown", "Unknown") { } + + } + + public class TablesInfoCollector + { + private readonly ILogger _logger; + private readonly List _tablesToAnalyze = new List(); + private readonly string _connectionString; + public TablesInfoCollector(string connectionString, List tablesToAnalyze, ILogger logger) + { + this._connectionString = connectionString; + this._tablesToAnalyze.AddRange(tablesToAnalyze); + this._logger = logger; + } + + public async Task> CollectTablesInfoAsync() + { + var result = new List(); + + var collector = new TableInfoCollector(_connectionString, _logger); + + foreach(var ta in _tablesToAnalyze) + { + var ti = await collector.CollectAsync(ta); + result.Add(ti); + } + + return result; + } + } + + public class TableInfoCollector + { + private readonly ILogger _logger; + private readonly SqlConnection _conn; + private readonly string _serverName; + private readonly string _databaseName; + private TableInfo _tableInfo; + + public TableInfoCollector(string connectionString, ILogger logger) + { + var sqsb = new SqlConnectionStringBuilder(connectionString); + this._serverName = sqsb.DataSource; + this._databaseName = sqsb.InitialCatalog; + this._conn = new SqlConnection(connectionString); + this._logger = logger; + } + + public async Task CollectAsync(string tableName) + { + _tableInfo = new TableInfo(this._serverName, this._databaseName, tableName); + + await CheckTableExistenceAsync(); + if (_tableInfo.Exists == false) return _tableInfo; + + // Get info on primary index + await GetHeapInfoAsync(); + await GetRowStoreClusteredInfoAsync(); + await GetColumnStoreClusteredInfoAsync(); + + // Check if secondary index exists + // TODO -> Is really needed? + + await GetTableSizeAsync(); + await GetColumnsForBulkCopyAsync(); + + return _tableInfo; + } + + private async Task CheckTableExistenceAsync() + { + bool result = false; + try + { + var tableName = await _conn.QuerySingleOrDefaultAsync(@"select + [FullName] = QUOTENAME(s.[name]) + '.' + QUOTENAME(t.[name]) + from + sys.tables t + inner join + sys.schemas s on t.[schema_id] = s.[schema_id] + where + s.[name] = PARSENAME(@tableName, 2) + and + t.[name] = PARSENAME(@tableName, 1)", new { @tableName = _tableInfo.TableName }); + + if (tableName != default(string)) result = true; + } + catch (InvalidOperationException) + { + result = false; + } + finally + { + _conn.Close(); + } + + _tableInfo.Exists = result; + } + + private async Task GetRowStoreClusteredInfoAsync() + { + if (!(_tableInfo.PrimaryIndex is UnknownIndex)) return; + + string sql = @" + select + c.name as ColumnName, + ic.key_ordinal as OrdinalPosition, + ic.is_descending_key as IsDescending, + ic.partition_ordinal as PartitionOrdinal + from + sys.indexes i + inner join + sys.index_columns ic on ic.index_id = i.index_id and ic.[object_id] = i.[object_id] + inner join + sys.columns c on ic.column_id = c.column_id and ic.[object_id] = c.[object_id] + where + i.[object_id] = object_id(@tableName) + and + i.[type] in (1) + order by + ic.key_ordinal, + ic.partition_ordinal + "; + + LogDebug($"Collecting Clustered RowStore Info. Executing:\n{sql}"); + + var qr = await _conn.QueryAsync(sql, new { @tableName = _tableInfo.TableName }); + + var rci = new RowStoreClusteredIndex(); + rci.Columns.AddRange(qr.ToList()); + + if (rci.Columns.Count > 0) + { + LogDebug($"Detected Clustered RowStore Index: {rci.GetOrderBy()}"); + _tableInfo.PrimaryIndex = rci; + + if (rci.Columns.Any(c => c.PartitionOrdinal != 0)) { + LogDebug($"Clustered RowStore Index is Partitioned on: {rci.GetPartitionBy()}"); + } + } + } + + private async Task GetHeapInfoAsync() + { + if (!(_tableInfo.PrimaryIndex is UnknownIndex)) return; + + string sql = @" + select + c.name as ColumnName, + ic.key_ordinal as OrdinalPosition, + ic.is_descending_key as IsDescending, + ic.partition_ordinal as PartitionOrdinal + from + sys.indexes i + left join + sys.index_columns ic on ic.index_id = i.index_id and ic.[object_id] = i.[object_id] + left join + sys.columns c on ic.column_id = c.column_id and ic.[object_id] = c.[object_id] + where + i.[object_id] = object_id(@tableName) + and + i.[type] in (0) + and + (ic.partition_ordinal != 0 or ic.partition_ordinal is null) + "; + + LogDebug($"Collecting Heap Info. Executing:\n{sql}"); + + var columns = (await _conn.QueryAsync(sql, new { @tableName = _tableInfo.TableName })).ToList();; + + if (columns.Count() == 1) + { + var h = new Heap(); + if (columns.First().ColumnName != null) + { + h.Columns.Add(columns.First()); + LogDebug($"Heap is Partitioned on: {h.GetPartitionBy()}"); + } + + _tableInfo.PrimaryIndex = h; + } + } + + private async Task GetColumnStoreClusteredInfoAsync() + { + if (!(_tableInfo.PrimaryIndex is UnknownIndex)) return; + + string sql = @" + with cte as + ( + select + 1 as sortKey, + c.name as ColumnName, + ic.key_ordinal as OrdinalPosition, + ic.is_descending_key as IsDescending, + ic.partition_ordinal as PartitionOrdinal + from + sys.indexes i + left join + sys.index_columns ic on ic.index_id = i.index_id and ic.[object_id] = i.[object_id] + left join + sys.columns c on ic.column_id = c.column_id and ic.[object_id] = c.[object_id] + where + i.[object_id] = object_id(@tableName) + and + i.[type] in (5) + and + (ic.partition_ordinal != 0 or ic.partition_ordinal is null) + + union + + select + 2 as sortKey, + null as ColumnName, + null as OrdinalPosition, + null as IsDescending, + null as PartitionOrdinal + from + sys.indexes i + where + i.[object_id] = object_id(@tableName) + and + i.[type] in (5) + ) + select top(1) * from cte order by sortKey + "; + + LogDebug($"Collecting Clustered ColumnStore Info. Executing:\n{sql}"); + + var columns = (await _conn.QueryAsync(sql, new { @tableName = _tableInfo.TableName })).ToList();; + + if (columns.Count() == 1) + { + var h = new ColumnStoreClusteredIndex(); + + if (columns[0].ColumnName != null) + { + h.Columns.Add(columns[0]); + LogDebug($"Clustered ColumnStore is Partitioned on: {h.GetPartitionBy()}"); + } + + _tableInfo.PrimaryIndex = h; + } + } + + private async Task GetTableSizeAsync() + { + string sql = @" + select + sum(row_count) as [RowCount], + cast((sum(used_page_count) * 8) / 1024. / 1024. as int) as SizeInGB + from + sys.dm_db_partition_stats + where + [object_id] = object_id(@tableName) + and + index_id in (0, 1, 5) + group by + [object_id] + "; + + LogDebug($"Executing:\n{sql}"); + + _tableInfo.Size = await _conn.QuerySingleAsync(sql, new { @tableName = _tableInfo.TableName }); + } + + private async Task GetColumnsForBulkCopyAsync() + { + LogDebug($"Creating column list..."); + + var sql = $@" + select + [name] + from + sys.columns + where + [object_id] = object_id(@tableName) + and + [is_computed] = 0 + and + [is_column_set] = 0 + "; + + LogDebug($"Executing:\n{sql}"); + + var columns = await _conn.QueryAsync(sql, new { @tableName = _tableInfo.TableName }); + _tableInfo.Columns = columns.ToList(); + } + + private void LogDebug(string message) + { + _logger.Debug($"[{_tableInfo.TableLocation}] ${message}"); + } + } +} \ No newline at end of file diff --git a/sql/SQLQuery1.sql b/sql/SQLQuery1.sql new file mode 100644 index 0000000..9a86c99 --- /dev/null +++ b/sql/SQLQuery1.sql @@ -0,0 +1,111 @@ +CREATE SCHEMA [schema1] +GO +CREATE SCHEMA [schema2] +GO +CREATE SCHEMA [schema3] +GO +CREATE SCHEMA [schema4] +GO +CREATE SCHEMA [schema5] +GO +CREATE SCHEMA [schema6] +GO +CREATE SCHEMA [schema7] +GO +CREATE SCHEMA [schema8] +GO +CREATE SCHEMA [schema9] +GO + +DROP TABLE IF EXISTS [schema1].[heap]; +CREATE TABLE [schema1].[heap] +( + id INT IDENTITY NOT NULL, + col1 VARCHAR(100) NOT NULL, + col3 VARCHAR(100) NULL, + col2 NVARCHAR(100) NOT NULL, + col4 NVARCHAR(100) NULL, + col5 TINYINT NOT NULL, + col6 TINYINT NULL, + col7 SMALLINT NOT NULL, + col8 SMALLINT NULL, + col9 INT NOT NULL, + col10 INT NULL, + col11 BIGINT NOT NULL, + col12 BIGINT NULL, + col13 SMALLDATETIME NOT NULL, + col14 SMALLDATETIME NULL, + col15 DATETIME NOT NULL, + col16 DATETIME NULL, + col17 DATE NOT NULL, + col18 DATE NULL, + col19 TIME(7) NOT NULL, + col20 TIME(7) NULL, + col21 DATETIME2(7) NOT NULL, + col22 DATETIME2(7) NULL, + col23 DATETIMEOFFSET NOT NULL, + col24 DATETIMEOFFSET NULL, + col25 SMALLMONEY NOT NULL, + col26 SMALLMONEY NULL, + col27 MONEY NOT NULL, + col28 MONEY NULL, + col29 DECIMAL(18,8) NOT NULL, + col30 DECIMAL(18,8) NULL, + col31 FLOAT NOT NULL, + col32 FLOAT NULL, + col33 TIMESTAMP NOT NULL +) +GO + +SELECT * INTO [schema1].[clustered_columnstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[clustered_rowstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[mix] FROM [schema1].[heap] +SELECT * INTO [schema1].[nonclustered_columnstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[nonclustered_rowstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_clustered_columnstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_clustered_rowstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_mix] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_nonclustered_columnstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_nonclustered_rowstore] FROM [schema1].[heap] +SELECT * INTO [schema1].[partitioned_heap] FROM [schema1].[heap] +go + +CREATE PARTITION FUNCTION pf_dummy (int) +AS RANGE LEFT FOR VALUES (1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000); +GO + +CREATE PARTITION SCHEME ps_dummy +AS PARTITION pf_dummy +ALL TO ([Primary]) +GO + +CREATE CLUSTERED INDEX ixc_dummy ON [schema1].partitioned_heap (id) ON ps_dummy(id) +GO +DROP INDEX ixc_dummy ON [schema1].partitioned_heap +GO + +CREATE CLUSTERED INDEX ixc_dummy ON [schema1].partitioned_clustered_rowstore (col17, col19) ON ps_dummy(id) +GO + +CREATE CLUSTERED INDEX ixc_dummy ON [schema1].clustered_rowstore (col17, col19) +GO + +ALTER TABLE [schema1].partitioned_clustered_columnstore +DROP COLUMN col33; +GO + +CREATE CLUSTERED INDEX ixc_dummy ON [schema1].partitioned_clustered_columnstore (id) ON ps_dummy(id) +GO +DROP INDEX ixc_dummy ON [schema1].partitioned_clustered_columnstore +GO +CREATE CLUSTERED COLUMNSTORE INDEX ixcc_dummy ON [schema1].partitioned_clustered_columnstore ON ps_dummy(id) +GO + +ALTER TABLE [schema1].clustered_columnstore +DROP COLUMN col33; +GO + +CREATE CLUSTERED COLUMNSTORE INDEX ixcc_dummy ON [schema1].clustered_columnstore +GO + +