Skip to content

Commit

Permalink
BulkCopy Remove MemoryStream To Reduce Memory Usage (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
MJEdwin authored Apr 19, 2024
1 parent 8d7085c commit 8dea7cb
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 54 deletions.
14 changes: 14 additions & 0 deletions ClickHouse.Client/ADO/ClickHouseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,20 @@ public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, Ca
await HandleError(response, sql, activity).ConfigureAwait(false);
}

internal async Task PostContentAsync(string sql, HttpContent httpData, CancellationToken token)
{
using var activity = this.StartActivity("PostStreamAsync");
activity.SetQuery(sql);

var builder = CreateUriBuilder(sql);
using var postMessage = new HttpRequestMessage(HttpMethod.Post, builder.ToString());
AddDefaultHttpHeaders(postMessage.Headers);

postMessage.Content = httpData;
using var response = await HttpClient.SendAsync(postMessage, HttpCompletionOption.ResponseContentRead, token).ConfigureAwait(false);
await HandleError(response, sql, activity).ConfigureAwait(false);
}

public new ClickHouseCommand CreateCommand() => new ClickHouseCommand(this);

void IDisposable.Dispose()
Expand Down
128 changes: 74 additions & 54 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
Expand Down Expand Up @@ -122,7 +126,7 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationTok
tasks[i] = Task.CompletedTask;
}

foreach (var batch in IntoBatches(rows, query, columnTypes))
foreach (var batch in IntoBatchContents(rows, query, columnTypes))
{
while (true)
{
Expand All @@ -143,34 +147,87 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationTok
await Task.WhenAll(tasks).ConfigureAwait(false);
}

private Stream SerializeBatch(Batch batch)
private async Task SendBatchAsync(BulkCopyHttpContent batchContent, CancellationToken token)
{
var stream = new MemoryStream() { Capacity = 8 * 1024 };
using (batchContent)
{
// Async sending
await connection.PostContentAsync(null, batchContent, token).ConfigureAwait(false);
// Increase counter
Interlocked.Add(ref rowsWritten, batchContent.Size);
}
}

using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
public void Dispose()
{
if (ownsConnection)
{
using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true))
connection?.Dispose();
}
GC.SuppressFinalize(this);
}

private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private IEnumerable<BulkCopyHttpContent> IntoBatchContents(IEnumerable<object[]> rows, string query, ClickHouseType[] types)
{
foreach (var (batch, size) in rows.BatchRented(BatchSize))
{
yield return new BulkCopyHttpContent(query, batch, size, types);
}
}

private class BulkCopyHttpContent : HttpContent
{
private readonly string query;
private readonly object[][] rows;
private readonly int size;
private readonly ClickHouseType[] types;

public BulkCopyHttpContent(string query, object[][] rows, int size, ClickHouseType[] types)
{
this.query = query;
this.rows = rows;
this.size = size;
this.types = types;
Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
Headers.ContentEncoding.Add("gzip");
}

public int Size => size;

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
textWriter.WriteLine(batch.Query);
await SerializeBatchAsync(gzipStream).ConfigureAwait(false);
}
}

using var writer = new ExtendedBinaryWriter(gzipStream);
private async Task SerializeBatchAsync(Stream stream)
{
using (var textWriter = new StreamWriter(stream, Encoding.UTF8, 4 * 1024, true))
{
await textWriter.WriteLineAsync(query).ConfigureAwait(false);
}

using var writer = new ExtendedBinaryWriter(stream);

int col = 0;
object[] row = null;
int counter = 0;
var enumerator = batch.Rows.GetEnumerator();
var enumerator = rows.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
row = (object[])enumerator.Current;
for (col = 0; col < row.Length; col++)
{
batch.Types[col].Write(writer, row[col]);
types[col].Write(writer, row[col]);
}
counter++;
if (counter >= batch.Size)
if (counter >= size)
break; // We've reached the batch size
}
}
Expand All @@ -179,57 +236,20 @@ private Stream SerializeBatch(Batch batch)
throw new ClickHouseBulkCopySerializationException(row, col, e);
}
}
stream.Seek(0, SeekOrigin.Begin);
return stream;
}

private async Task SendBatchAsync(Batch batch, CancellationToken token)
{
using (batch) // Dispose object regardless whether sending succeeds
{
// Async serialization
using var stream = await Task.Run(() => SerializeBatch(batch)).ConfigureAwait(false);
// Async sending
await connection.PostStreamAsync(null, stream, true, token).ConfigureAwait(false);
// Increase counter
Interlocked.Add(ref rowsWritten, batch.Size);
}
}

public void Dispose()
{
if (ownsConnection)
{
connection?.Dispose();
}
GC.SuppressFinalize(this);
}

private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private IEnumerable<Batch> IntoBatches(IEnumerable<object[]> rows, string query, ClickHouseType[] types)
{
foreach (var (batch, size) in rows.BatchRented(BatchSize))
protected override bool TryComputeLength(out long length)
{
yield return new Batch { Rows = batch, Size = size, Query = query, Types = types };
length = 0;
return false;
}
}

// Convenience argument collection
private struct Batch : IDisposable
{
public object[] Rows;
public int Size;
public string Query;
public ClickHouseType[] Types;

public void Dispose()
protected override void Dispose(bool disposing)
{
if (Rows != null)
if (disposing)
{
ArrayPool<object>.Shared.Return(Rows);
Rows = null;
ArrayPool<object[]>.Shared.Return(rows);
}
base.Dispose(disposing);
}
}
}

0 comments on commit 8dea7cb

Please sign in to comment.