Skip to content

Commit

Permalink
Merge pull request #18 from MikeAmputer/kravtsov/bump-packages
Browse files Browse the repository at this point in the history
Bump packages
  • Loading branch information
MikeAmputer authored Feb 5, 2024
2 parents 9e08287 + 6175c08 commit 5c4e13d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
37 changes: 37 additions & 0 deletions src/ClickHouse.Facades.Example/Context/OrdersFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,41 @@ user_id asc
limit 1
";
#endregion

#region BulkInsert
public async Task InsertOrdersBulk(CancellationToken cancellationToken = default)
{
await BulkInsertAsync(
"example_orders",
Enumerable.Range(0, 100).Select(i => new object[] { i % 10 + 1, i * 2 + 1, (i + 1) / 0.33 }),
new[] { "user_id", "order_id", "price" },
batchSize: 45,
maxDegreeOfParallelism: 2,
cancellationToken: cancellationToken);
}

public async Task CopyOrdersBulk(CancellationToken cancellationToken = default)
{
var reader = await ExecuteReaderAsync("select * from example_orders where user_id = 1", cancellationToken);

await BulkInsertAsync(
"example_orders",
reader,
batchSize: 45,
maxDegreeOfParallelism: 2,
cancellationToken: cancellationToken);
}

public async Task CopyOrdersDataTable(CancellationToken cancellationToken = default)
{
var dataTable = ExecuteDataTable("select * from example_orders where user_id = 10", cancellationToken);

await BulkInsertAsync(
"example_orders",
dataTable,
batchSize: 45,
maxDegreeOfParallelism: 2,
cancellationToken: cancellationToken);
}
#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ internal override Task<long> BulkInsertAsync(
string destinationTable,
Func<ClickHouseBulkCopy, Task> saveAction,
int batchSize,
int maxDegreeOfParallelism)
int maxDegreeOfParallelism,
IReadOnlyCollection<string>? columnNames = null)
{
throw new NotImplementedException();
}
Expand Down
13 changes: 8 additions & 5 deletions src/ClickHouse.Facades.Tests/ClickHouse.Facades.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.1" />
<PackageReference Include="Moq" Version="4.20.69" />
<PackageReference Include="MSTest.TestAdapter" Version="2.2.10" />
<PackageReference Include="MSTest.TestFramework" Version="2.2.10" />
<PackageReference Include="coverlet.collector" Version="3.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="MSTest.TestAdapter" Version="3.2.0" />
<PackageReference Include="MSTest.TestFramework" Version="3.2.0" />
<PackageReference Include="coverlet.collector" Version="6.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/ClickHouse.Facades/ClickHouse.Facades.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ClickHouse.Client" Version="6.7.5" />
<PackageReference Include="ClickHouse.Client" Version="7.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>

Expand Down
15 changes: 10 additions & 5 deletions src/ClickHouse.Facades/Context/ClickHouseConnectionBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ internal virtual async Task<long> BulkInsertAsync(
string destinationTable,
Func<ClickHouseBulkCopy, Task> saveAction,
int batchSize,
int maxDegreeOfParallelism)
int maxDegreeOfParallelism,
IReadOnlyCollection<string>? columnNames = null)
{
ThrowIfNotConnected();

Expand All @@ -97,11 +98,15 @@ internal virtual async Task<long> BulkInsertAsync(
throw new InvalidOperationException($"Sessions are not compatible with parallel insertion.");
}

using var bulkCopyInterface = new ClickHouseBulkCopy(_connection);
bulkCopyInterface.DestinationTableName = destinationTable;
bulkCopyInterface.BatchSize = batchSize;
bulkCopyInterface.MaxDegreeOfParallelism = maxDegreeOfParallelism;
using var bulkCopyInterface = new ClickHouseBulkCopy(_connection)
{
DestinationTableName = destinationTable,
BatchSize = batchSize,
MaxDegreeOfParallelism = maxDegreeOfParallelism,
ColumnNames = columnNames,
};

await bulkCopyInterface.InitAsync();
await saveAction(bulkCopyInterface);

return bulkCopyInterface.RowsWritten;
Expand Down
17 changes: 12 additions & 5 deletions src/ClickHouse.Facades/Context/ClickHouseFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,17 @@ protected DataTable ExecuteDataTable(string query, CancellationToken cancellatio
protected Task<long> BulkInsertAsync(
string destinationTable,
IEnumerable<object[]> rows,
IReadOnlyCollection<string>? columns = null,
IReadOnlyCollection<string>? columnNames = null,
int batchSize = 100000,
int maxDegreeOfParallelism = 4,
CancellationToken cancellationToken = default)
{
return BulkInsertAsync(
destinationTable,
bulkInterface => bulkInterface.WriteToServerAsync(rows, columns, cancellationToken),
bulkInterface => bulkInterface.WriteToServerAsync(rows, cancellationToken),
batchSize,
maxDegreeOfParallelism);
maxDegreeOfParallelism,
columnNames);
}

protected Task<long> BulkInsertAsync(
Expand Down Expand Up @@ -124,8 +125,14 @@ private Task<long> BulkInsertAsync(
string destinationTable,
Func<ClickHouseBulkCopy, Task> saveAction,
int batchSize,
int maxDegreeOfParallelism)
int maxDegreeOfParallelism,
IReadOnlyCollection<string>? columnNames = null)
{
return _connectionBroker.BulkInsertAsync(destinationTable, saveAction, batchSize, maxDegreeOfParallelism);
return _connectionBroker.BulkInsertAsync(
destinationTable,
saveAction,
batchSize,
maxDegreeOfParallelism,
columnNames);
}
}

0 comments on commit 5c4e13d

Please sign in to comment.