Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2.10.1 Release #102

Merged
merged 10 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/ParquetViewer.Engine/DataTableLite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace ParquetViewer.Engine
{
internal class DataTableLite
{
internal record ColumnLite(string Name, Type Type, string? Parent, int Ordinal);
internal record ColumnLite(string Name, Type Type, ParquetSchemaElement ParentSchema, int Ordinal);

private int _ordinal = 0;
private readonly Dictionary<string, ColumnLite> _columns = new();
private readonly List<object[]?> _rows;
private readonly List<object[]> _rows;

/// <summary>
/// Total number of rows in the opened parquet file(s)
Expand All @@ -24,14 +24,14 @@ internal record ColumnLite(string Name, Type Type, string? Parent, int Ordinal);
/// <summary>
/// Rows of the dataset
/// </summary>
public IReadOnlyList<object[]?> Rows => _rows;
public IReadOnlyList<object[]> Rows => _rows;

public DataTableLite(int expectedRowCount = 1000)
{
this._rows = new(expectedRowCount);
}

public ColumnLite AddColumn(string name, Type type, string? parent = null)
public ColumnLite AddColumn(string name, Type type, ParquetSchemaElement parent)
{
if (_rows.Count > 0)
{
Expand Down Expand Up @@ -61,7 +61,6 @@ public ColumnLite GetColumn(string name)
public DataTable ToDataTable(CancellationToken token, IProgress<int>? progress = null)
{
var dataTable = new DataTable();

foreach (var column in _columns)
{
token.ThrowIfCancellationRequested();
Expand All @@ -71,7 +70,7 @@ public DataTable ToDataTable(CancellationToken token, IProgress<int>? progress =
if (dataTable.Columns.Contains(columnLite.Name))
{
//DataTable's don't support case sensitive field names unfortunately
var columnPath = (columnLite.Parent is not null ? columnLite.Parent + "/" : string.Empty) + columnLite.Name;
var columnPath = columnLite.ParentSchema + "/" + columnLite.Name;
throw new NotSupportedException($"Duplicate column '{columnPath}' detected. Column names are case insensitive and must be unique.");
}

Expand Down
194 changes: 100 additions & 94 deletions src/ParquetViewer.Engine/ParquetEngine.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,25 @@ private async Task ProcessRowGroup(DataTableLite dataTable, ParquetRowGroupReade
{
cancellationToken.ThrowIfCancellationRequested();

var field = ParquetSchemaTree.GetChild(column.Parent, column.Name);
if (field.SchemaElement.LogicalType?.LIST is not null || field.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
var field = column.ParentSchema.GetChild(column.Name);
switch (field.FieldType())
{
await ReadListField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
}
else if (field.SchemaElement.LogicalType?.MAP is not null || field.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.MAP)
{
await ReadMapField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
}
else if (field.SchemaElement.NumChildren > 0) //Struct
{
await ReadStructField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
}
else
{
await ReadPrimitiveField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
case ParquetSchemaElement.FieldTypeId.Primitive:
await ReadPrimitiveField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
break;
case ParquetSchemaElement.FieldTypeId.List:
await ReadListField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
break;
case ParquetSchemaElement.FieldTypeId.Map:
await ReadMapField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
break;
case ParquetSchemaElement.FieldTypeId.Struct:
await ReadStructField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, cancellationToken, progress);
break;
}

isFirstColumn = false;
Expand Down Expand Up @@ -163,82 +162,86 @@ private async Task ReadPrimitiveField(DataTableLite dataTable, ParquetRowGroupRe
private async Task ReadListField(DataTableLite dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, CancellationToken cancellationToken, IProgress<int>? progress)
{
var listField = field.GetChild("list");
var listField = field.GetSingle("list");
ParquetSchemaElement itemField;
try
{
itemField = listField.GetImmediateChildOrSingle("item"); //Not all parquet files follow the same format so we're being lax with getting the child here
itemField = listField.GetSingle("item");
}
catch (Exception ex)
{
throw new UnsupportedFieldException($"Cannot load field `{field.Path}`. Invalid List type.", ex);
}

if (itemField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field `{field.Path}`. Nested list types are not supported.");

int rowIndex = rowBeginIndex;

int skippedRecords = 0;
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField!, cancellationToken);

ArrayList? rowValue = null;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < dataColumn.Data.Length; i++)
if (itemField.FieldType() == ParquetSchemaElement.FieldTypeId.Primitive)
{
cancellationToken.ThrowIfCancellationRequested();

rowValue ??= new ArrayList();
int rowIndex = rowBeginIndex;

bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels!.Length
|| dataColumn.RepetitionLevels[i + 1] == 0; //0 means new list
int skippedRecords = 0;
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField!, cancellationToken);

//Skip rows
while (skipRecords > skippedRecords)
ArrayList? rowValue = null;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < dataColumn.Data.Length; i++)
{
if (IsEndOfRow())
skippedRecords++;
cancellationToken.ThrowIfCancellationRequested();

i++;
}
rowValue ??= new ArrayList();

//If we skipped to the end then just exit
if (i == dataColumn.Data.Length)
break;
bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels!.Length
|| dataColumn.RepetitionLevels[i + 1] == 0; //0 means new list

if (IsEndOfRow())
{
if (isFirstColumn)
//Skip rows
while (skipRecords > skippedRecords)
{
dataTable.NewRow();
if (IsEndOfRow())
skippedRecords++;

i++;
}

var lastItem = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(lastItem);
//If we skipped to the end then just exit
if (i == dataColumn.Data.Length)
break;

if (IsEndOfRow())
{
if (isFirstColumn)
{
dataTable.NewRow();
}

dataTable.Rows[rowIndex]![fieldIndex] = new ListValue(rowValue, itemField.DataField!.ClrType);
rowValue = null;
var lastItem = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(lastItem);

rowIndex++;
progress?.Report(1);
dataTable.Rows[rowIndex]![fieldIndex] = new ListValue(rowValue, itemField.DataField!.ClrType);
rowValue = null;

if (rowIndex - rowBeginIndex >= readRecords)
break;
}
else
{
var value = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(value);
rowIndex++;
progress?.Report(1);

if (rowIndex - rowBeginIndex >= readRecords)
break;
}
else
{
var value = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(value);
}
}
}
else
{
throw new NotSupportedException($"Lists of {itemField.FieldType()}s are not currently supported");
}
}

private async Task ReadMapField(DataTableLite dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, CancellationToken cancellationToken, IProgress<int>? progress)
{
var keyValueField = field.GetChild("key_value");
var keyField = keyValueField.GetChild("key");
var valueField = keyValueField.GetChild("value");
var keyValueField = field.GetSingle("key_value");
var keyField = keyValueField.GetChildCI("key");
var valueField = keyValueField.GetChildCI("value");

if (keyField.Children.Any() || valueField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field `{field.Path}`. Nested map types are not supported");
Expand All @@ -249,8 +252,9 @@ private async Task ReadMapField(DataTableLite dataTable, ParquetRowGroupReader g
var keyDataColumn = await groupReader.ReadColumnAsync(keyField.DataField!, cancellationToken);
var valueDataColumn = await groupReader.ReadColumnAsync(valueField.DataField!, cancellationToken);

int rowCount = Math.Max(keyDataColumn.Data.Length, valueDataColumn.Data.Length);
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < valueDataColumn.Data.Length; i++)
for (int i = 0; i < rowCount; i++)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -265,12 +269,8 @@ private async Task ReadMapField(DataTableLite dataTable, ParquetRowGroupReader g
dataTable.NewRow();
}

bool isMapTypeValid = keyDataColumn.Data.Length == valueDataColumn.Data.Length;
if (!isMapTypeValid)
throw new UnsupportedFieldException($"`{field.Path}` is malformed and cannot be loaded");

var key = keyDataColumn.Data.GetValue(i) ?? DBNull.Value;
var value = valueDataColumn.Data.GetValue(i) ?? DBNull.Value;
var key = keyDataColumn.Data.Length > i ? keyDataColumn.Data.GetValue(i) ?? DBNull.Value : DBNull.Value;
var value = valueDataColumn.Data.Length > i ? valueDataColumn.Data.GetValue(i) ?? DBNull.Value : DBNull.Value;
dataTable.Rows[rowIndex]![fieldIndex] = new MapValue(key, keyField.DataField!.ClrType, value, valueField.DataField!.ClrType);

rowIndex++;
Expand All @@ -285,33 +285,16 @@ private async Task ReadStructField(DataTableLite dataTable, ParquetRowGroupReade
long skipRecords, long readRecords, bool isFirstColumn, CancellationToken cancellationToken, IProgress<int>? progress)
{
//Read struct data as a new datatable
DataTableLite structFieldTable = BuildDataTable(field.Path, field.Children.Select(f => f.Path).ToList(), 1);
DataTableLite structFieldTable = BuildDataTable(field, field.Children.Select(f => f.Path).ToList(), 1);

//Need to calculate progress differently for structs
var structFieldReadProgress = new SimpleProgress();
structFieldReadProgress.ProgressChanged += (int progressSoFar) =>
{
if (structFieldTable.Columns.Count > 0)
{
//To report progress accurately we'll need to divide the progress total
//by the field count to convert it to row count in the main data table.
var increment = progressSoFar % structFieldTable.Columns.Count;
if (increment == 0)
progress?.Report(1);
}
else
{
//If the struct field has no columns, then each read is one row.
progress?.Report(1);
}
};
var structFieldReadProgress = StructReadProgress(progress, structFieldTable.Columns.Count);

//Read the struct data and populate the datatable
await ProcessRowGroup(structFieldTable, groupReader, skipRecords, readRecords, cancellationToken, structFieldReadProgress);

var rowIndex = rowBeginIndex;
var fieldIndex = dataTable.Columns[field.Path]?.Ordinal ?? throw new Exception($"Column `{field.Path}` is missing");

var finalResultDataTable = structFieldTable.ToDataTable(cancellationToken);
for (var i = 0; i < finalResultDataTable.Rows.Count; i++)
{
Expand All @@ -327,12 +310,35 @@ private async Task ReadStructField(DataTableLite dataTable, ParquetRowGroupReade
}
}

private DataTableLite BuildDataTable(string? parent, List<string> fields, int expectedRecordCount)
private SimpleProgress StructReadProgress(IProgress<int>? _progress, int fieldCount)
{
var progress = new SimpleProgress();
progress.ProgressChanged += (int progressSoFar) =>
{
if (fieldCount > 0)
{
//To report progress accurately we'll need to divide the progress total
//by the field count to convert it to row count in the main data table.
var increment = progressSoFar % fieldCount;
if (increment == 0)
_progress?.Report(1);
}
else
{
//If the struct field has no columns, then each read is one row.
_progress?.Report(1);
}
};
return progress;
}

private DataTableLite BuildDataTable(ParquetSchemaElement? parent, List<string> fields, int expectedRecordCount)
{
parent ??= this.ParquetSchemaTree;
DataTableLite dataTable = new(expectedRecordCount);
foreach (var field in fields)
{
var schema = ParquetSchemaTree.GetChild(parent, field);
var schema = parent.GetChild(field);
if (schema.SchemaElement.ConvertedType == ConvertedType.LIST)
{
dataTable.AddColumn(field, typeof(ListValue), parent);
Expand Down
Loading