Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet - writing & reading nullable structures with nullable fields #677

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/lib/dremel/src/Flow/Dremel/Dremel.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public function shred(array $data, int $maxDefinitionLevel) : DataShredded
return new DataShredded(
$this->buildRepetitions($data),
$definitions,
array_flatten($data)
\array_values(\array_filter(array_flatten($data), static fn ($item) => $item !== null))
);
}

Expand Down Expand Up @@ -168,7 +168,7 @@ private function buildDefinitions(array $data, array &$definitions, int $maxDefi
$this->buildDefinitions($value, $definitions, $maxDefinitionLevel);
} else {
if ($value === null) {
$definitions[] = $maxDefinitionLevel - 1;
$definitions[] = 0;
} else {
$definitions[] = $maxDefinitionLevel;
}
Expand Down
19 changes: 19 additions & 0 deletions src/lib/dremel/tests/Flow/Dremel/Tests/Integration/DremelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,23 @@ public function test_dremel_shredding_and_assembling() : void

$this->assertSame($values, $assembledValues);
}

public function test_dremel_shredding_and_assembling_nullable_nested_values() : void
{
$repetitions = [0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0];
$definitions = [2, 2, 0, 2, 2, 2, 2, 2, 2, 0, 2, 2, 0, 2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 0];
$values = [[0, 1], null, [0, 1, 2, 3, 4, 5], null, [0, 1], null, [0, 1, 2, 3, 4], null, [0, 1, 2, 3, 4], null];
$flatValues = [0, 1, 0, 1, 2, 3, 4, 5, 0, 1, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4];

$dremel = new Dremel();
$shredded = $dremel->shred($values, 2);

$this->assertSame($repetitions, $shredded->repetitions);
$this->assertSame($definitions, $shredded->definitions);
$this->assertSame($flatValues, $shredded->values);

$assembledValues = \iterator_to_array($dremel->assemble($shredded->repetitions, $shredded->definitions, $flatValues));

$this->assertSame($values, $assembledValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function test_decode_flat_column_of_integers_where_every_second_one_is_nu
{
$repetitions = [];
$definitions = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0];
$values = [0, 2, 4, 6, 8, null, null, null, null, null];
$values = [0, 2, 4, 6, 8];

$this->assertSame(
[0, null, 2, null, 4, null, 6, null, 8, null],
Expand Down
38 changes: 38 additions & 0 deletions src/lib/parquet/resources/python/generators/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,41 @@ def generate_struct_flat():

return struct_flat_data

def generate_struct_flat_nullable():
struct_flat_data = []
for i in range(n_rows):
if i % 2 != 0:
struct_flat_data.append(None)
continue

string_value = f'string_{i}'
string_nullable_value = f'string_{i}' if i % 2 == 0 else None
int_value = i
int_nullable_value = i if i % 2 == 0 else None
bool_value = i % 2 == 0
bool_nullable_value = i % 2 == 0 if i % 2 == 0 else None
list_of_ints_value = [random.randint(1, 10) for _ in range(3)]
list_of_strings_value = [f'str_{j}' for j in range(3)]
map_of_string_int_value = {f'key_{j}': j for j in range(3)}
map_of_int_int_value = {j: j for j in range(3)}

struct_flat_element = {
'string': string_value,
'string_nullable': string_nullable_value,
'int': int_value,
'int_nullable': int_nullable_value,
'bool': bool_value,
'bool_nullable': bool_nullable_value,
'list_of_ints': list_of_ints_value,
'list_of_strings': list_of_strings_value,
'map_of_string_int': map_of_string_int_value,
'map_of_int_int': map_of_int_int_value
}

struct_flat_data.append(struct_flat_element)

return struct_flat_data

def generate_struct_nested():
struct_nested_data = []
for i in range(n_rows):
Expand Down Expand Up @@ -217,6 +252,7 @@ def generate_struct_deeply_nested():

# Columns
struct_flat_col = generate_struct_flat()
struct_flat_nullable_col = generate_struct_flat_nullable()
struct_nested_col = generate_struct_nested()
struct_nested_with_list_of_lists_col = generate_struct_nested_with_list_of_lists()
struct_nested_with_list_of_maps_col = generate_struct_nested_with_list_of_maps()
Expand All @@ -228,6 +264,7 @@ def generate_struct_deeply_nested():
# Creating the DataFrame with only the new column
df_nested_list = pd.DataFrame({
'struct_flat': struct_flat_col,
'struct_flat_nullable': struct_flat_nullable_col,
'struct_nested': struct_nested_col,
'struct_nested_with_list_of_lists': struct_nested_with_list_of_lists_col,
'struct_nested_with_list_of_maps': struct_nested_with_list_of_maps_col,
Expand Down Expand Up @@ -362,6 +399,7 @@ def generate_struct_deeply_nested():
# Define the schema
schema = pa.schema([
('struct_flat', struct_flat_type),
('struct_flat_nullable', struct_flat_type),
('struct_nested', struct_nested_type),
('struct_nested_with_list_of_lists', struct_nested_with_list_of_lists_type),
('struct_nested_with_list_of_maps', struct_nested_with_list_of_maps_type),
Expand Down
13 changes: 12 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,23 @@ private function readStruct(NestedColumn $structColumn, bool $isCollection = fal
} else {
$row = [];

$isNull = true;

foreach ($childrenRowData as $childColumnPath => $childColumnValue) {
$childColumn = $this->schema()->get($childColumnPath);

$row[$childColumn->name()] = $childColumnValue;

if ($childColumnValue !== null) {
$isNull = false;
}
}

if ($isNull) {
yield null;
} else {
yield $row;
}
yield $row;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public function flattenColumn(Column $column, array $row) : array
}

/** @var NestedColumn $column */
if (!\is_array($columnData) && ($column->isMap() || $column->isList())) {
return [];
}

if ($column->isList()) {
$listElementColumn = $column->getListElement();

if ($columnData === null) {
return [
$listElementColumn->flatPath() => null,
];
}

if ($listElementColumn instanceof FlatColumn) {
return [
$listElementColumn->flatPath() => $columnData,
Expand All @@ -53,6 +55,13 @@ public function flattenColumn(Column $column, array $row) : array
$keyColumn = $column->getMapKeyColumn();
$valueColumn = $column->getMapValueColumn();

if ($columnData === null) {
return [
$keyColumn->flatPath() => null,
$valueColumn->flatPath() => null,
];
}

if ($valueColumn instanceof FlatColumn) {
return [
$keyColumn->flatPath() => \array_keys($columnData),
Expand All @@ -78,7 +87,7 @@ public function flattenColumn(Column $column, array $row) : array
$data = [];

foreach ($column->children() as $child) {
$data = \array_merge($data, $this->flattenColumn($child, $columnData));
$data = \array_merge($data, $this->flattenColumn($child, $columnData ?? [$child->name() => null]));
}

return $data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ public function packValues(FlatColumn $column, array $values) : string
$parquetValues = [];

foreach ($values as $value) {
if ($value === null) {
continue;
}

$parquetValues[] = $this->dataConverter->toParquetType($column, $value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public function valuesCount() : int
$count = 0;

foreach ($this->dataPageContainers as $pageContainer) {
$count += \count($pageContainer->values);
$count += $pageContainer->pageHeader->dataValuesCount();
}

return $count;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,30 @@ public function test_writing_list_of_strings() : void
\iterator_to_array((new Reader())->read($path)->values())
);
}

public function test_writing_nullable_list_of_ints() : void
{
$path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet';

$writer = new Writer();
$schema = Schema::with(NestedColumn::list('list_of_ints', ListElement::int32()));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'list_of_ints' => $i % 2 === 0
? \array_map(static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX), \range(1, \random_int(2, 10)))
: null,
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

$this->assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,35 @@ public function test_writing_map_of_int_string() : void
\iterator_to_array((new Reader())->read($path)->values())
);
}

public function test_writing_nullable_map_of_int_int() : void
{
$path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet';

$writer = new Writer();
$schema = Schema::with(NestedColumn::map('map_int_int', MapKey::int32(), MapValue::int32()));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'map_int_int' => $i % 2 === 0
? \array_merge(
...\array_map(
static fn ($i) => [$i => $faker->numberBetween(0, Consts::PHP_INT32_MAX)],
\range(1, \random_int(2, 10))
)
)
: null,
],
];
}, \range(0, 99)));

$writer->write($path, $schema, $inputData);

$this->assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ public function test_reading_structs_schema_ddl() : void
NestedColumn::map('map_of_string_int', MapKey::string(), MapValue::int32()),
NestedColumn::map('map_of_int_int', MapKey::int32(), MapValue::int32()),
]),
NestedColumn::struct('struct_flat_nullable', [
FlatColumn::string('string'),
FlatColumn::string('string_nullable'),
FlatColumn::int32('int'),
FlatColumn::int32('int_nullable'),
FlatColumn::boolean('bool'),
FlatColumn::boolean('bool_nullable'),
NestedColumn::list('list_of_ints', ListElement::int32()),
NestedColumn::list('list_of_strings', ListElement::string()),
NestedColumn::map('map_of_string_int', MapKey::string(), MapValue::int32()),
NestedColumn::map('map_of_int_int', MapKey::int32(), MapValue::int32()),
]),
NestedColumn::struct('struct_nested', [
FlatColumn::string('string'),
NestedColumn::struct('struct_flat', [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ public function test_reading_struct_flat_column() : void
$this->assertSame($file->metadata()->rowsNumber(), $count);
}

public function test_reading_struct_flat_nullable_column() : void
{
$reader = new Reader(logger: $this->getLogger());
$file = $reader->read(__DIR__ . '/../../Fixtures/structs.parquet');

$this->assertNull($file->metadata()->schema()->get('struct_flat_nullable')->type());
$this->assertNull($file->metadata()->schema()->get('struct_flat_nullable')->logicalType());

$count = 0;

foreach ($file->values(['struct_flat_nullable']) as $i => $row) {
if ($i % 2 === 0) {
$this->assertArrayHasKey('string', $row['struct_flat_nullable']);
$this->assertArrayHasKey('int', $row['struct_flat_nullable']);
$this->assertArrayHasKey('list_of_ints', $row['struct_flat_nullable']);
$this->assertArrayHasKey('map_of_string_int', $row['struct_flat_nullable']);
} else {
$this->assertNull($row['struct_flat_nullable']);
}
$count++;
}
$this->assertSame(100, $count);
$this->assertSame($file->metadata()->rowsNumber(), $count);
}

public function test_reading_struct_nested_column() : void
{
$reader = new Reader(logger: $this->getLogger());
Expand Down
Loading