From 3266381621a5299f81537a03eb60f29bc53dd610 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Sun, 29 Oct 2023 20:55:03 +0100 Subject: [PATCH] Added support for writing nullable columns into parquet --- .../PageBuilder/DataPageBuilder.php | 2 +- .../FloatDictionaryBuilder.php | 4 + .../ObjectDictionaryBuilder.php | 4 + .../ScalarDictionaryBuilder.php | 4 + .../PageBuilder/PlainValuesPacker.php | 4 + .../RowGroupBuilder/PageContainers.php | 4 - .../Integration/IO/SimpleTypesWritingTest.php | 320 +++++++++++++++++- 7 files changed, 325 insertions(+), 17 deletions(-) diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php index 7798d8baf..d69582132 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php @@ -51,7 +51,7 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null \strlen($pageBuffer), dataPageHeader: new DataPageHeader( $dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN, - $dictionary && $indices ? \count($indices) : \count($shredded->values), + \count($shredded->definitions) ), dataPageHeaderV2: null, dictionaryPageHeader: null, diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php index e7bdb30a3..999024587 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/FloatDictionaryBuilder.php @@ -15,6 +15,10 @@ public function build(array $rows) : Dictionary $dictionarySize = 0; foreach (array_flatten($rows) as $value) { + if ($value === null) { + continue; + } + $hash = \serialize($value); if (!isset($valueToIndex[$hash])) { diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php index 711050830..457107b77 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ObjectDictionaryBuilder.php @@ -15,6 +15,10 @@ public function build(array $rows) : Dictionary $dictionarySize = 0; foreach (array_flatten($rows) as $value) { + if ($value === null) { + continue; + } + $hash = \serialize($value); if (!isset($valueToIndex[$hash])) { diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php index c76abc84b..a4993b4a1 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryBuilder/ScalarDictionaryBuilder.php @@ -15,6 +15,10 @@ public function build(array $rows) : Dictionary $dictionarySize = 0; foreach (array_flatten($rows) as $value) { + if ($value === null) { + continue; + } + if (!isset($valueToIndex[$value])) { $dictionary[] = $value; $valueToIndex[$value] = $dictionarySize; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/PlainValuesPacker.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/PlainValuesPacker.php index 4f8090404..798fb3a87 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/PlainValuesPacker.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/PlainValuesPacker.php @@ -21,6 +21,10 @@ public function packValues(FlatColumn $column, array $values) : string $parquetValues = []; foreach ($values as $value) { + if ($value === null) { + continue; + } + $parquetValues[] = $this->dataConverter->toParquetType($column, $value); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php index c47076346..773fb3818 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainers.php @@ -99,10 +99,6 @@ public function size() : int public function valuesCount() : int { - if ($this->dictionaryPageContainer !== null) { - return \count($this->dictionaryPageContainer->values); - } - $count = 0; foreach ($this->dataPageContainers as $pageContainer) { diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php index f2e741402..6afc802df 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php @@ -32,11 +32,35 @@ public function test_writing_bool_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_bool_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::boolean('boolean')); + + $inputData = \array_merge(...\array_map(static function (int $i) : array { + return [ + [ + 'boolean' => $i % 2 == 0 ? (bool) \random_int(0, 1) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertSame( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_date_column() : void @@ -62,11 +86,37 @@ public function test_writing_date_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_date_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::date('date')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'date' => $i % 2 === 0 ? \DateTimeImmutable::createFromMutable($faker->dateTimeThisYear)->setTime(0, 0, 0, 0) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_decimal_column() : void @@ -92,11 +142,37 @@ public function test_writing_decimal_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_decimal_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::decimal('decimal')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'decimal' => $i % 2 === 0 ? \round($faker->randomFloat(5), 2) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_double_column() : void @@ -122,11 +198,37 @@ public function test_writing_double_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_double_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::double('double')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'double' => $i % 2 === 0 ? $faker->randomFloat() : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_enum_column() : void @@ -155,11 +257,35 @@ public function test_writing_float_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_float_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::float('float')); + + $inputData = \array_merge(...\array_map(static function (int $i) : array { + return [ + [ + 'float' => $i % 2 === 0 ? 10.25 : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_int32_column() : void @@ -185,11 +311,37 @@ public function test_writing_int32_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_int32_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::int32('int32')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'int32' => $i % 2 === 0 ? $faker->numberBetween(0, Consts::PHP_INT32_MAX) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_int64() : void @@ -215,11 +367,35 @@ public function test_writing_int64() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_int64_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::int64('int64')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'int64' => $i % 2 === 0 ? $faker->numberBetween(0, Consts::PHP_INT64_MAX) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_json_column() : void @@ -245,11 +421,37 @@ public function test_writing_json_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_json_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::json('json')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'json' => $i % 2 === 0 + ? \json_encode(['street' => $faker->streetName, 'city' => $faker->city, 'country' => $faker->country, 'zip' => $faker->postcode], JSON_THROW_ON_ERROR) + : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_string_column() : void @@ -275,11 +477,35 @@ public function test_writing_string_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_string_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::string('string')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'string' => $i % 2 === 0 ? $faker->text(50) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_time_column() : void @@ -303,11 +529,33 @@ public function test_writing_time_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_time_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::time('time')); + + $inputData = \array_merge(...\array_map(static function (int $i) : array { + return [ + [ + 'time' => $i % 2 === 0 ? (new \DateTimeImmutable('2023-01-01 00:00:00 UTC'))->diff(new \DateTimeImmutable('2023-01-01 15:45:00 UTC')) : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_timestamp_column() : void @@ -333,11 +581,35 @@ public function test_writing_timestamp_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_timestamp_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::dateTime('dateTime')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'dateTime' => $i % 2 === 0 ? $faker->dateTimeThisYear : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_uuid_column() : void @@ -363,10 +635,34 @@ public function test_writing_uuid_column() : void $inputData, \iterator_to_array((new Reader())->read($path)->values()) ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_uuid_nullable_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::uuid('uuid')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'uuid' => $i % 2 === 0 ? $faker->uuid : null, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + $this->assertTrue(\file_exists($path)); + \unlink($path); } }