Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert removal of MemoryStream in BulkCopy #478

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions ClickHouse.Client/ADO/ClickHouseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,6 @@ public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, Ca
await HandleError(response, sql, activity).ConfigureAwait(false);
}

internal async Task PostContentAsync(string sql, HttpContent httpData, CancellationToken token)
{
using var activity = this.StartActivity("PostStreamAsync");
activity.SetQuery(sql);

var builder = CreateUriBuilder(sql);
using var postMessage = new HttpRequestMessage(HttpMethod.Post, builder.ToString());
AddDefaultHttpHeaders(postMessage.Headers);

postMessage.Content = httpData;
using var response = await HttpClient.SendAsync(postMessage, HttpCompletionOption.ResponseContentRead, token).ConfigureAwait(false);
await HandleError(response, sql, activity).ConfigureAwait(false);
}

public new ClickHouseCommand CreateCommand() => new ClickHouseCommand(this);

void IDisposable.Dispose()
Expand Down
128 changes: 54 additions & 74 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
Expand Down Expand Up @@ -126,7 +122,7 @@
tasks[i] = Task.CompletedTask;
}

foreach (var batch in IntoBatchContents(rows, query, columnTypes))
foreach (var batch in IntoBatches(rows, query, columnTypes))
{
while (true)
{
Expand All @@ -147,87 +143,34 @@
await Task.WhenAll(tasks).ConfigureAwait(false);
}

private async Task SendBatchAsync(BulkCopyHttpContent batchContent, CancellationToken token)
private Stream SerializeBatch(Batch batch)

Check warning on line 146 in ClickHouse.Client/Copy/ClickHouseBulkCopy.cs

View workflow job for this annotation

GitHub Actions / Short

Member 'SerializeBatch' does not access instance data and can be marked as static (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1822)
{
using (batchContent)
{
// Async sending
await connection.PostContentAsync(null, batchContent, token).ConfigureAwait(false);
// Increase counter
Interlocked.Add(ref rowsWritten, batchContent.Size);
}
}
var stream = new MemoryStream() { Capacity = 8 * 1024 };

public void Dispose()
{
if (ownsConnection)
using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
connection?.Dispose();
}
GC.SuppressFinalize(this);
}

private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private IEnumerable<BulkCopyHttpContent> IntoBatchContents(IEnumerable<object[]> rows, string query, ClickHouseType[] types)
{
foreach (var (batch, size) in rows.BatchRented(BatchSize))
{
yield return new BulkCopyHttpContent(query, batch, size, types);
}
}

private class BulkCopyHttpContent : HttpContent
{
private readonly string query;
private readonly object[][] rows;
private readonly int size;
private readonly ClickHouseType[] types;

public BulkCopyHttpContent(string query, object[][] rows, int size, ClickHouseType[] types)
{
this.query = query;
this.rows = rows;
this.size = size;
this.types = types;
Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
Headers.ContentEncoding.Add("gzip");
}

public int Size => size;

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true))
{
await SerializeBatchAsync(gzipStream).ConfigureAwait(false);
textWriter.WriteLine(batch.Query);
}
}

private async Task SerializeBatchAsync(Stream stream)
{
using (var textWriter = new StreamWriter(stream, Encoding.UTF8, 4 * 1024, true))
{
await textWriter.WriteLineAsync(query).ConfigureAwait(false);
}

using var writer = new ExtendedBinaryWriter(stream);
using var writer = new ExtendedBinaryWriter(gzipStream);

int col = 0;
object[] row = null;
int counter = 0;
var enumerator = rows.GetEnumerator();
var enumerator = batch.Rows.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
row = (object[])enumerator.Current;
for (col = 0; col < row.Length; col++)
{
types[col].Write(writer, row[col]);
batch.Types[col].Write(writer, row[col]);
}
counter++;
if (counter >= size)
if (counter >= batch.Size)
break; // We've reached the batch size
}
}
Expand All @@ -236,20 +179,57 @@
throw new ClickHouseBulkCopySerializationException(row, col, e);
}
}
stream.Seek(0, SeekOrigin.Begin);
return stream;
}

private async Task SendBatchAsync(Batch batch, CancellationToken token)
{
using (batch) // Dispose object regardless whether sending succeeds
{
// Async serialization
using var stream = await Task.Run(() => SerializeBatch(batch)).ConfigureAwait(false);
// Async sending
await connection.PostStreamAsync(null, stream, true, token).ConfigureAwait(false);
// Increase counter
Interlocked.Add(ref rowsWritten, batch.Size);
}
}

public void Dispose()
{
if (ownsConnection)
{
connection?.Dispose();
}
GC.SuppressFinalize(this);
}

protected override bool TryComputeLength(out long length)
private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private IEnumerable<Batch> IntoBatches(IEnumerable<object[]> rows, string query, ClickHouseType[] types)
{
foreach (var (batch, size) in rows.BatchRented(BatchSize))
{
length = 0;
return false;
yield return new Batch { Rows = batch, Size = size, Query = query, Types = types };
}
}

// Convenience argument collection
private struct Batch : IDisposable
{
public object[] Rows;
public int Size;
public string Query;
public ClickHouseType[] Types;

protected override void Dispose(bool disposing)
public void Dispose()
{
if (disposing)
if (Rows != null)
{
ArrayPool<object[]>.Shared.Return(rows);
ArrayPool<object>.Shared.Return(Rows);
Rows = null;
}
base.Dispose(disposing);
}
}
}
Loading