Skip to content

Commit

Permalink
Parquet - writing & reading nullable structures with nullable fields (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Oct 30, 2023
1 parent 0ed3860 commit c2bf1c3
Show file tree
Hide file tree
Showing 15 changed files with 331 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/lib/dremel/src/Flow/Dremel/Dremel.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public function shred(array $data, int $maxDefinitionLevel) : DataShredded
return new DataShredded(
$this->buildRepetitions($data),
$definitions,
array_flatten($data)
\array_values(\array_filter(array_flatten($data), static fn ($item) => $item !== null))
);
}

Expand Down Expand Up @@ -168,7 +168,7 @@ private function buildDefinitions(array $data, array &$definitions, int $maxDefi
$this->buildDefinitions($value, $definitions, $maxDefinitionLevel);
} else {
if ($value === null) {
$definitions[] = $maxDefinitionLevel - 1;
$definitions[] = 0;
} else {
$definitions[] = $maxDefinitionLevel;
}
Expand Down
19 changes: 19 additions & 0 deletions src/lib/dremel/tests/Flow/Dremel/Tests/Integration/DremelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,23 @@ public function test_dremel_shredding_and_assembling() : void

$this->assertSame($values, $assembledValues);
}

public function test_dremel_shredding_and_assembling_nullable_nested_values() : void
{
$repetitions = [0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0];
$definitions = [2, 2, 0, 2, 2, 2, 2, 2, 2, 0, 2, 2, 0, 2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 0];
$values = [[0, 1], null, [0, 1, 2, 3, 4, 5], null, [0, 1], null, [0, 1, 2, 3, 4], null, [0, 1, 2, 3, 4], null];
$flatValues = [0, 1, 0, 1, 2, 3, 4, 5, 0, 1, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4];

$dremel = new Dremel();
$shredded = $dremel->shred($values, 2);

$this->assertSame($repetitions, $shredded->repetitions);
$this->assertSame($definitions, $shredded->definitions);
$this->assertSame($flatValues, $shredded->values);

$assembledValues = \iterator_to_array($dremel->assemble($shredded->repetitions, $shredded->definitions, $flatValues));

$this->assertSame($values, $assembledValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function test_decode_flat_column_of_integers_where_every_second_one_is_nu
{
$repetitions = [];
$definitions = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0];
$values = [0, 2, 4, 6, 8, null, null, null, null, null];
$values = [0, 2, 4, 6, 8];

$this->assertSame(
[0, null, 2, null, 4, null, 6, null, 8, null],
Expand Down
38 changes: 38 additions & 0 deletions src/lib/parquet/resources/python/generators/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,41 @@ def generate_struct_flat():

return struct_flat_data

def generate_struct_flat_nullable():
struct_flat_data = []
for i in range(n_rows):
if i % 2 != 0:
struct_flat_data.append(None)
continue

string_value = f'string_{i}'
string_nullable_value = f'string_{i}' if i % 2 == 0 else None
int_value = i
int_nullable_value = i if i % 2 == 0 else None
bool_value = i % 2 == 0
bool_nullable_value = i % 2 == 0 if i % 2 == 0 else None
list_of_ints_value = [random.randint(1, 10) for _ in range(3)]
list_of_strings_value = [f'str_{j}' for j in range(3)]
map_of_string_int_value = {f'key_{j}': j for j in range(3)}
map_of_int_int_value = {j: j for j in range(3)}

struct_flat_element = {
'string': string_value,
'string_nullable': string_nullable_value,
'int': int_value,
'int_nullable': int_nullable_value,
'bool': bool_value,
'bool_nullable': bool_nullable_value,
'list_of_ints': list_of_ints_value,
'list_of_strings': list_of_strings_value,
'map_of_string_int': map_of_string_int_value,
'map_of_int_int': map_of_int_int_value
}

struct_flat_data.append(struct_flat_element)

return struct_flat_data

def generate_struct_nested():
struct_nested_data = []
for i in range(n_rows):
Expand Down Expand Up @@ -217,6 +252,7 @@ def generate_struct_deeply_nested():

# Columns
struct_flat_col = generate_struct_flat()
struct_flat_nullable_col = generate_struct_flat_nullable()
struct_nested_col = generate_struct_nested()
struct_nested_with_list_of_lists_col = generate_struct_nested_with_list_of_lists()
struct_nested_with_list_of_maps_col = generate_struct_nested_with_list_of_maps()
Expand All @@ -228,6 +264,7 @@ def generate_struct_deeply_nested():
# Creating the DataFrame with only the new column
df_nested_list = pd.DataFrame({
'struct_flat': struct_flat_col,
'struct_flat_nullable': struct_flat_nullable_col,
'struct_nested': struct_nested_col,
'struct_nested_with_list_of_lists': struct_nested_with_list_of_lists_col,
'struct_nested_with_list_of_maps': struct_nested_with_list_of_maps_col,
Expand Down Expand Up @@ -362,6 +399,7 @@ def generate_struct_deeply_nested():
# Define the schema
schema = pa.schema([
('struct_flat', struct_flat_type),
('struct_flat_nullable', struct_flat_type),
('struct_nested', struct_nested_type),
('struct_nested_with_list_of_lists', struct_nested_with_list_of_lists_type),
('struct_nested_with_list_of_maps', struct_nested_with_list_of_maps_type),
Expand Down
13 changes: 12 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,23 @@ private function readStruct(NestedColumn $structColumn, bool $isCollection = fal
} else {
$row = [];

$isNull = true;

foreach ($childrenRowData as $childColumnPath => $childColumnValue) {
$childColumn = $this->schema()->get($childColumnPath);

$row[$childColumn->name()] = $childColumnValue;

if ($childColumnValue !== null) {
$isNull = false;
}
}

if ($isNull) {
yield null;
} else {
yield $row;
}
yield $row;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public function flattenColumn(Column $column, array $row) : array
}

/** @var NestedColumn $column */
if (!\is_array($columnData) && ($column->isMap() || $column->isList())) {
return [];
}

if ($column->isList()) {
$listElementColumn = $column->getListElement();

if ($columnData === null) {
return [
$listElementColumn->flatPath() => null,
];
}

if ($listElementColumn instanceof FlatColumn) {
return [
$listElementColumn->flatPath() => $columnData,
Expand All @@ -53,6 +55,13 @@ public function flattenColumn(Column $column, array $row) : array
$keyColumn = $column->getMapKeyColumn();
$valueColumn = $column->getMapValueColumn();

if ($columnData === null) {
return [
$keyColumn->flatPath() => null,
$valueColumn->flatPath() => null,
];
}

if ($valueColumn instanceof FlatColumn) {
return [
$keyColumn->flatPath() => \array_keys($columnData),
Expand All @@ -78,7 +87,7 @@ public function flattenColumn(Column $column, array $row) : array
$data = [];

foreach ($column->children() as $child) {
$data = \array_merge($data, $this->flattenColumn($child, $columnData));
$data = \array_merge($data, $this->flattenColumn($child, $columnData ?? [$child->name() => null]));
}

return $data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ public function packValues(FlatColumn $column, array $values) : string
$parquetValues = [];

foreach ($values as $value) {
if ($value === null) {
continue;
}

$parquetValues[] = $this->dataConverter->toParquetType($column, $value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public function valuesCount() : int
$count = 0;

foreach ($this->dataPageContainers as $pageContainer) {
$count += \count($pageContainer->values);
$count += $pageContainer->pageHeader->dataValuesCount();
}

return $count;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,30 @@ public function test_writing_list_of_strings() : void
\iterator_to_array((new Reader())->read($path)->values())
);
}

public function test_writing_nullable_list_of_ints() : void
{
$path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet';

$writer = new Writer();
$schema = Schema::with(NestedColumn::list('list_of_ints', ListElement::int32()));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'list_of_ints' => $i % 2 === 0
? \array_map(static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX), \range(1, \random_int(2, 10)))
: null,
],
];
}, \range(1, 100)));

$writer->write($path, $schema, $inputData);

$this->assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,35 @@ public function test_writing_map_of_int_string() : void
\iterator_to_array((new Reader())->read($path)->values())
);
}

public function test_writing_nullable_map_of_int_int() : void
{
$path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet';

$writer = new Writer();
$schema = Schema::with(NestedColumn::map('map_int_int', MapKey::int32(), MapValue::int32()));

$faker = Factory::create();
$inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array {
return [
[
'map_int_int' => $i % 2 === 0
? \array_merge(
...\array_map(
static fn ($i) => [$i => $faker->numberBetween(0, Consts::PHP_INT32_MAX)],
\range(1, \random_int(2, 10))
)
)
: null,
],
];
}, \range(0, 99)));

$writer->write($path, $schema, $inputData);

$this->assertSame(
$inputData,
\iterator_to_array((new Reader())->read($path)->values())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ public function test_reading_structs_schema_ddl() : void
NestedColumn::map('map_of_string_int', MapKey::string(), MapValue::int32()),
NestedColumn::map('map_of_int_int', MapKey::int32(), MapValue::int32()),
]),
NestedColumn::struct('struct_flat_nullable', [
FlatColumn::string('string'),
FlatColumn::string('string_nullable'),
FlatColumn::int32('int'),
FlatColumn::int32('int_nullable'),
FlatColumn::boolean('bool'),
FlatColumn::boolean('bool_nullable'),
NestedColumn::list('list_of_ints', ListElement::int32()),
NestedColumn::list('list_of_strings', ListElement::string()),
NestedColumn::map('map_of_string_int', MapKey::string(), MapValue::int32()),
NestedColumn::map('map_of_int_int', MapKey::int32(), MapValue::int32()),
]),
NestedColumn::struct('struct_nested', [
FlatColumn::string('string'),
NestedColumn::struct('struct_flat', [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ public function test_reading_struct_flat_column() : void
$this->assertSame($file->metadata()->rowsNumber(), $count);
}

public function test_reading_struct_flat_nullable_column() : void
{
$reader = new Reader(logger: $this->getLogger());
$file = $reader->read(__DIR__ . '/../../Fixtures/structs.parquet');

$this->assertNull($file->metadata()->schema()->get('struct_flat_nullable')->type());
$this->assertNull($file->metadata()->schema()->get('struct_flat_nullable')->logicalType());

$count = 0;

foreach ($file->values(['struct_flat_nullable']) as $i => $row) {
if ($i % 2 === 0) {
$this->assertArrayHasKey('string', $row['struct_flat_nullable']);
$this->assertArrayHasKey('int', $row['struct_flat_nullable']);
$this->assertArrayHasKey('list_of_ints', $row['struct_flat_nullable']);
$this->assertArrayHasKey('map_of_string_int', $row['struct_flat_nullable']);
} else {
$this->assertNull($row['struct_flat_nullable']);
}
$count++;
}
$this->assertSame(100, $count);
$this->assertSame($file->metadata()->rowsNumber(), $count);
}

public function test_reading_struct_nested_column() : void
{
$reader = new Reader(logger: $this->getLogger());
Expand Down
Loading

0 comments on commit c2bf1c3

Please sign in to comment.