From bca4d5684a1610e616c4ba654d5d8ad30f66cd38 Mon Sep 17 00:00:00 2001 From: Pavel Kravtsov Date: Mon, 5 Feb 2024 19:23:46 +0800 Subject: [PATCH 1/3] Update ClickHouse.Client -> 7.0.0 --- .../ClickHouseConnectionBrokerStub.cs | 3 ++- .../ClickHouse.Facades.csproj | 2 +- .../Context/ClickHouseConnectionBroker.cs | 15 ++++++++++----- .../Context/ClickHouseFacade.cs | 17 ++++++++++++----- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/ClickHouse.Facades.Testing/ClickHouseConnectionBrokerStub.cs b/src/ClickHouse.Facades.Testing/ClickHouseConnectionBrokerStub.cs index 45135ca..1924676 100644 --- a/src/ClickHouse.Facades.Testing/ClickHouseConnectionBrokerStub.cs +++ b/src/ClickHouse.Facades.Testing/ClickHouseConnectionBrokerStub.cs @@ -79,7 +79,8 @@ internal override Task BulkInsertAsync( string destinationTable, Func saveAction, int batchSize, - int maxDegreeOfParallelism) + int maxDegreeOfParallelism, + IReadOnlyCollection? columnNames = null) { throw new NotImplementedException(); } diff --git a/src/ClickHouse.Facades/ClickHouse.Facades.csproj b/src/ClickHouse.Facades/ClickHouse.Facades.csproj index 002a45b..0b88c25 100644 --- a/src/ClickHouse.Facades/ClickHouse.Facades.csproj +++ b/src/ClickHouse.Facades/ClickHouse.Facades.csproj @@ -33,7 +33,7 @@ - + diff --git a/src/ClickHouse.Facades/Context/ClickHouseConnectionBroker.cs b/src/ClickHouse.Facades/Context/ClickHouseConnectionBroker.cs index f516275..98c5c9c 100644 --- a/src/ClickHouse.Facades/Context/ClickHouseConnectionBroker.cs +++ b/src/ClickHouse.Facades/Context/ClickHouseConnectionBroker.cs @@ -75,7 +75,8 @@ internal virtual async Task BulkInsertAsync( string destinationTable, Func saveAction, int batchSize, - int maxDegreeOfParallelism) + int maxDegreeOfParallelism, + IReadOnlyCollection? columnNames = null) { ThrowIfNotConnected(); @@ -97,11 +98,15 @@ internal virtual async Task BulkInsertAsync( throw new InvalidOperationException($"Sessions are not compatible with parallel insertion."); } - using var bulkCopyInterface = new ClickHouseBulkCopy(_connection); - bulkCopyInterface.DestinationTableName = destinationTable; - bulkCopyInterface.BatchSize = batchSize; - bulkCopyInterface.MaxDegreeOfParallelism = maxDegreeOfParallelism; + using var bulkCopyInterface = new ClickHouseBulkCopy(_connection) + { + DestinationTableName = destinationTable, + BatchSize = batchSize, + MaxDegreeOfParallelism = maxDegreeOfParallelism, + ColumnNames = columnNames, + }; + await bulkCopyInterface.InitAsync(); await saveAction(bulkCopyInterface); return bulkCopyInterface.RowsWritten; diff --git a/src/ClickHouse.Facades/Context/ClickHouseFacade.cs b/src/ClickHouse.Facades/Context/ClickHouseFacade.cs index 4d6b7e0..5dcc501 100644 --- a/src/ClickHouse.Facades/Context/ClickHouseFacade.cs +++ b/src/ClickHouse.Facades/Context/ClickHouseFacade.cs @@ -76,16 +76,17 @@ protected DataTable ExecuteDataTable(string query, CancellationToken cancellatio protected Task BulkInsertAsync( string destinationTable, IEnumerable rows, - IReadOnlyCollection? columns = null, + IReadOnlyCollection? columnNames = null, int batchSize = 100000, int maxDegreeOfParallelism = 4, CancellationToken cancellationToken = default) { return BulkInsertAsync( destinationTable, - bulkInterface => bulkInterface.WriteToServerAsync(rows, columns, cancellationToken), + bulkInterface => bulkInterface.WriteToServerAsync(rows, cancellationToken), batchSize, - maxDegreeOfParallelism); + maxDegreeOfParallelism, + columnNames); } protected Task BulkInsertAsync( @@ -124,8 +125,14 @@ private Task BulkInsertAsync( string destinationTable, Func saveAction, int batchSize, - int maxDegreeOfParallelism) + int maxDegreeOfParallelism, + IReadOnlyCollection? columnNames = null) { - return _connectionBroker.BulkInsertAsync(destinationTable, saveAction, batchSize, maxDegreeOfParallelism); + return _connectionBroker.BulkInsertAsync( + destinationTable, + saveAction, + batchSize, + maxDegreeOfParallelism, + columnNames); } } From 53f9b472c0474907d962a302cf768af13ccd1597 Mon Sep 17 00:00:00 2001 From: Pavel Kravtsov Date: Mon, 5 Feb 2024 19:25:27 +0800 Subject: [PATCH 2/3] Bump packages --- .../ClickHouse.Facades.Example.csproj | 2 +- .../ClickHouse.Facades.Tests.csproj | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/ClickHouse.Facades.Example/ClickHouse.Facades.Example.csproj b/src/ClickHouse.Facades.Example/ClickHouse.Facades.Example.csproj index 6b6a264..7bd37c2 100644 --- a/src/ClickHouse.Facades.Example/ClickHouse.Facades.Example.csproj +++ b/src/ClickHouse.Facades.Example/ClickHouse.Facades.Example.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/ClickHouse.Facades.Tests/ClickHouse.Facades.Tests.csproj b/src/ClickHouse.Facades.Tests/ClickHouse.Facades.Tests.csproj index b7f20b4..23f9dee 100644 --- a/src/ClickHouse.Facades.Tests/ClickHouse.Facades.Tests.csproj +++ b/src/ClickHouse.Facades.Tests/ClickHouse.Facades.Tests.csproj @@ -11,11 +11,14 @@ - - - - - + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + From 6175c08239150ce1799152713e94caed483e2dd3 Mon Sep 17 00:00:00 2001 From: Pavel Kravtsov Date: Mon, 5 Feb 2024 19:52:57 +0800 Subject: [PATCH 3/3] Add BulkInsert examples --- .../Context/OrdersFacade.cs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/ClickHouse.Facades.Example/Context/OrdersFacade.cs b/src/ClickHouse.Facades.Example/Context/OrdersFacade.cs index 9690f03..a384d30 100644 --- a/src/ClickHouse.Facades.Example/Context/OrdersFacade.cs +++ b/src/ClickHouse.Facades.Example/Context/OrdersFacade.cs @@ -57,4 +57,41 @@ user_id asc limit 1 "; #endregion + + #region BulkInsert + public async Task InsertOrdersBulk(CancellationToken cancellationToken = default) + { + await BulkInsertAsync( + "example_orders", + Enumerable.Range(0, 100).Select(i => new object[] { i % 10 + 1, i * 2 + 1, (i + 1) / 0.33 }), + new[] { "user_id", "order_id", "price" }, + batchSize: 45, + maxDegreeOfParallelism: 2, + cancellationToken: cancellationToken); + } + + public async Task CopyOrdersBulk(CancellationToken cancellationToken = default) + { + var reader = await ExecuteReaderAsync("select * from example_orders where user_id = 1", cancellationToken); + + await BulkInsertAsync( + "example_orders", + reader, + batchSize: 45, + maxDegreeOfParallelism: 2, + cancellationToken: cancellationToken); + } + + public async Task CopyOrdersDataTable(CancellationToken cancellationToken = default) + { + var dataTable = ExecuteDataTable("select * from example_orders where user_id = 10", cancellationToken); + + await BulkInsertAsync( + "example_orders", + dataTable, + batchSize: 45, + maxDegreeOfParallelism: 2, + cancellationToken: cancellationToken); + } + #endregion }