Skip to content

Commit

Permalink
Enrich BulkCopy exceptions with row/row index information #188 (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkWanderer authored Sep 1, 2022
1 parent b7104b6 commit 2547ba9
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 20 deletions.
30 changes: 27 additions & 3 deletions ClickHouse.Client.Tests/BulkCopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName
[RequiredFeature(Feature.InlineQuery)]
public async Task ShouldInsertIntoTableWithLotsOfColumns()
{
var tblName = "test.bulk_long_columns";
var tableName = "test.bulk_long_columns";
var columnCount = 3900;

//Generating create tbl statement with a lot of columns
var query = $"CREATE TABLE IF NOT EXISTS {tblName}(\n";
var query = $"CREATE TABLE IF NOT EXISTS {tableName}(\n";
var columns = Enumerable.Range(1, columnCount)
.Select(x => $" some_loooooooooooooonnnnnnnnnnnngggggggg_column_name_{x} Int32");
query += string.Join(",\n", columns);
Expand All @@ -139,12 +139,36 @@ public async Task ShouldInsertIntoTableWithLotsOfColumns()
//Create tbl in db
await connection.ExecuteStatementAsync(query);

var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = tblName };
var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = tableName };

var rowToInsert = new[] { Enumerable.Range(1, columnCount).Select(x => (object)x).ToArray() };
await bulkCopy.WriteToServerAsync(rowToInsert);
}

[Test]
public async Task ShouldThrowSpecialExceptionOnSerializationFailure()
{
var targetTable = "test." + SanitizeTableName($"bulk_exception_uint8");

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value UInt8) ENGINE Memory");

var rows = Enumerable.Range(250, 10).Select(n => new object[] { n }).ToArray();

var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = targetTable };
try
{
await bulkCopy.WriteToServerAsync(rows);
Assert.Fail("Bulk copy did not throw exception on failed serialization");
}
catch (ClickHouseBulkCopySerializationException ex)
{
CollectionAssert.AreEqual(new object[] { 256 }, ex.Row);
Assert.AreEqual(0, ex.Index);
Assert.IsInstanceOf<OverflowException>(ex.InnerException);
}
}

[Test]
public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()
{
Expand Down
4 changes: 2 additions & 2 deletions ClickHouse.Client/ADO/Readers/ClickHouseDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public override bool Read()
return true;
}

#pragma warning disable CA2215 // Dispose methods should call base class dispose
#pragma warning disable CA2215 // Dispose methods should call base class dispose
protected override void Dispose(bool disposing)
{
if (disposing)
Expand All @@ -186,7 +186,7 @@ protected override void Dispose(bool disposing)
reader?.Dispose();
}
}
#pragma warning restore CA2215 // Dispose methods should call base class dispose
#pragma warning restore CA2215 // Dispose methods should call base class dispose

private void ReadHeaders()
{
Expand Down
23 changes: 18 additions & 5 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollec
ClickHouseType[] columnTypes = null;
string[] columnNames = columns?.ToArray();

using (var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false))
using (var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {ClickHouseBulkCopy.GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false))
{
columnTypes = reader.GetClickHouseColumnTypes();
columnNames ??= reader.GetColumnNames();
Expand Down Expand Up @@ -148,7 +148,7 @@ public void Dispose()
GC.SuppressFinalize(this);
}

private string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);
private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private async Task PushBatch(ICollection<object[]> rows, ClickHouseType[] columnTypes, string[] columnNames, CancellationToken token)
{
Expand All @@ -166,13 +166,26 @@ private async Task PushBatch(ICollection<object[]> rows, ClickHouseType[] column
}

using var writer = new ExtendedBinaryWriter(gzipStream);
foreach (var row in rows)

// Performance optimization: declare vars in advance to use wider `try` block
object[] row = null;
int i = 0;
try
{
for (var i = 0; i < row.Length; i++)
using var enumerator = rows.GetEnumerator();
while (enumerator.MoveNext())
{
columnTypes[i].Write(writer, row[i]);
row = enumerator.Current;
for (i = 0; i < row.Length; i++)
{
columnTypes[i].Write(writer, row[i]);
}
}
}
catch (Exception e)
{
throw new ClickHouseBulkCopySerializationException(row, i, e);
}
}
stream.Seek(0, SeekOrigin.Begin);

Expand Down
38 changes: 38 additions & 0 deletions ClickHouse.Client/Copy/ClickHouseBulkCopySerializationException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using ClickHouse.Client.ADO.Readers;
using ClickHouse.Client.Formats;
using ClickHouse.Client.Properties;
using ClickHouse.Client.Types;
using ClickHouse.Client.Utility;

namespace ClickHouse.Client.Copy
{
public class ClickHouseBulkCopySerializationException : Exception
{
public ClickHouseBulkCopySerializationException(object[] row, int index, Exception innerException)
: base("Error when serializing data", innerException)
{
Row = row;
Index = index;
}

/// <summary>
/// Gets row at which exception happened
/// </summary>
public object[] Row { get; }

/// <summary>
/// Gets index of bad value in row
/// </summary>
public int Index { get; }
}
}
30 changes: 23 additions & 7 deletions ClickHouse.Client/Formats/ExtendedBinaryReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,44 @@ public ExtendedBinaryReader(Stream stream)

public new int Read7BitEncodedInt() => base.Read7BitEncodedInt();

/// <summary>
/// Performs guaranteed read of requested number of bytes, or throws an exception
/// </summary>
/// <param name="buffer">buffer array</param>
/// <param name="index">index to write to in the buffer</param>
/// <param name="count">number of bytes to read</param>
/// <returns>number of bytes read, always equals to count</returns>
/// <exception cref="EndOfStreamException">thrown if requested number of bytes is not available</exception>
public override byte[] ReadBytes(int count)
{
var buffer = new byte[count];
Read(buffer, 0, count);
return buffer;
}

/// <summary>
/// Performs guaranteed read of requested number of bytes, or throws an exception
/// </summary>
/// <param name="buffer">buffer array</param>
/// <param name="index">index to write to in the buffer</param>
/// <param name="count">number of bytes to read</param>
/// <returns>number of bytes read, always equals to count</returns>
/// <exception cref="EndOfStreamException">thrown if requested number of bytes is not available</exception>
public override int Read(byte[] buffer, int index, int count)
{
int read = 0;
int bytesRead = 0;
do
{
int num2 = base.Read(buffer, index + read, count - read);
read += num2;
if (read < count && PeekChar() == -1)
int read = base.Read(buffer, index + bytesRead, count - bytesRead);
bytesRead += read;
if (read == 0 && bytesRead < count)
{
throw new EndOfStreamException($"Expected to read {count} bytes, got {read}");
throw new EndOfStreamException($"Expected to read {count} bytes, got {bytesRead}");
}
}
while (read < count);
while (bytesRead < count);

return read;
return bytesRead;
}

public override int PeekChar() => streamWrapper.Peek();
Expand Down
1 change: 0 additions & 1 deletion ClickHouse.Client/Types/MultiPolygonType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace ClickHouse.Client.Types
{

internal class MultiPolygonType : ArrayType
{
public MultiPolygonType()
Expand Down
1 change: 0 additions & 1 deletion ClickHouse.Client/Types/PolygonType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace ClickHouse.Client.Types
{

internal class PolygonType : ArrayType
{
public PolygonType()
Expand Down
1 change: 0 additions & 1 deletion ClickHouse.Client/Types/RingType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace ClickHouse.Client.Types
{

internal class RingType : ArrayType
{
public RingType()
Expand Down

0 comments on commit 2547ba9

Please sign in to comment.