From 009e0fcb57cc330e55c473d7f976a80ffa3e2eb5 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Mon, 6 Nov 2023 21:15:19 +0100 Subject: [PATCH] Parquet - write column chunk statistics --- .../ColumnChunkStatisticsTest.php | 238 ++++++++++++++++++ .../RowGroupBuilder/ColumnChunkBuilder.php | 4 +- .../RowGroupBuilder/ColumnChunkStatistics.php | 44 ++++ .../RowGroupBuilder/Statistics/Comparator.php | 52 ++++ .../RowGroupBuilder/StatisticsBuilder.php | 35 +++ .../Integration/IO/SimpleTypesWritingTest.php | 25 +- .../Tests/Integration/IO/WriterTest.php | 29 +++ 7 files changed, 425 insertions(+), 2 deletions(-) create mode 100644 src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php create mode 100644 src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/Statistics/Comparator.php create mode 100644 src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/StatisticsBuilder.php diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php new file mode 100644 index 000000000..fddecd836 --- /dev/null +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php @@ -0,0 +1,238 @@ +add(true); + $statistics->add(false); + $statistics->add(true); + $statistics->add(true); + $statistics->add(false); + $statistics->add(false); + $statistics->add(null); + + $this->assertFalse($statistics->min()); + $this->assertTrue($statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(2, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_date() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::date('date')); + + $statistics->add(new \DateTimeImmutable('2020-01-01')); + $statistics->add(new \DateTimeImmutable('2020-01-02')); + $statistics->add(new \DateTimeImmutable('2020-01-03')); + $statistics->add(new \DateTimeImmutable('2020-01-04')); + $statistics->add(new \DateTimeImmutable('2020-01-05')); + $statistics->add(new \DateTimeImmutable('2020-01-05')); + $statistics->add(null); + + $this->assertSame('2020-01-01', $statistics->min()->format('Y-m-d')); + $this->assertSame('2020-01-05', $statistics->max()->format('Y-m-d')); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_decimal() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::decimal('decimal')); + + $statistics->add('1.1'); + $statistics->add('2.2'); + $statistics->add('3.3'); + $statistics->add('4.4'); + $statistics->add('5.5'); + $statistics->add('5.5'); + $statistics->add(null); + + $this->assertSame('1.1', $statistics->min()); + $this->assertSame('5.5', $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_double() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::double('double')); + + $statistics->add(1.1); + $statistics->add(2.2); + $statistics->add(3.3); + $statistics->add(4.4); + $statistics->add(5.5); + $statistics->add(5.5); + $statistics->add(null); + + $this->assertSame(1.1, $statistics->min()); + $this->assertSame(5.5, $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_enum() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::enum('enum')); + + $statistics->add('a'); + $statistics->add('b'); + $statistics->add('c'); + $statistics->add('d'); + $statistics->add('e'); + $statistics->add('e'); + $statistics->add(null); + + $this->assertSame('a', $statistics->min()); + $this->assertSame('e', $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_float() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::float('float')); + + $statistics->add(1.1); + $statistics->add(2.2); + $statistics->add(3.3); + $statistics->add(4.4); + $statistics->add(5.5); + $statistics->add(5.5); + $statistics->add(null); + + $this->assertSame(1.1, $statistics->min()); + $this->assertSame(5.5, $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_int32() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::int32('int32')); + + $statistics->add(1); + $statistics->add(2); + $statistics->add(3); + $statistics->add(4); + $statistics->add(5); + $statistics->add(5); + $statistics->add(null); + + $this->assertSame(1, $statistics->min()); + $this->assertSame(5, $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_int64() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::int64('int64')); + + $statistics->add(1); + $statistics->add(2); + $statistics->add(3); + $statistics->add(4); + $statistics->add(5); + $statistics->add(5); + $statistics->add(null); + + $this->assertSame(1, $statistics->min()); + $this->assertSame(5, $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_json() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::json('json')); + + $statistics->add('{"a":1}'); + $statistics->add('{"b":2}'); + $statistics->add('{"c":3}'); + $statistics->add('{"d":4}'); + $statistics->add('{"e":5}'); + $statistics->add('{"e":5}'); + $statistics->add(null); + + $this->assertSame('{"a":1}', $statistics->min()); + $this->assertSame('{"e":5}', $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_string() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::string('string')); + + $statistics->add('a'); + $statistics->add('b'); + $statistics->add('c'); + $statistics->add('d'); + $statistics->add('e'); + $statistics->add('e'); + $statistics->add(null); + + $this->assertSame('a', $statistics->min()); + $this->assertSame('e', $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_time() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::time('time')); + + $statistics->add(new \DateInterval('PT1S')); + $statistics->add(new \DateInterval('PT2S')); + $statistics->add(new \DateInterval('PT3S')); + $statistics->add(new \DateInterval('PT4S')); + $statistics->add(new \DateInterval('PT5S')); + $statistics->add(new \DateInterval('PT5S')); + $statistics->add(null); + + $this->assertSame('PT01S', $statistics->min()->format('PT%SS')); + $this->assertSame('PT05S', $statistics->max()->format('PT%SS')); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } + + public function test_statistics_for_uuid() : void + { + $statistics = new ColumnChunkStatistics(FlatColumn::uuid('uuid')); + + $statistics->add('00000000-0000-0000-0000-000000000000'); + $statistics->add('00000000-0000-0000-0000-000000000001'); + $statistics->add('00000000-0000-0000-0000-000000000002'); + $statistics->add('00000000-0000-0000-0000-000000000003'); + $statistics->add('00000000-0000-0000-0000-000000000004'); + $statistics->add('00000000-0000-0000-0000-000000000004'); + $statistics->add(null); + + $this->assertSame('00000000-0000-0000-0000-000000000000', $statistics->min()); + $this->assertSame('00000000-0000-0000-0000-000000000004', $statistics->max()); + $this->assertSame(7, $statistics->valuesCount()); + $this->assertSame(5, $statistics->distinctCount()); + $this->assertSame(1, $statistics->nullCount()); + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php index 7abd3bbda..3cf1559bf 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php @@ -35,6 +35,8 @@ public function flush(int $fileOffset) : ColumnChunkContainer $pageContainers = (new PagesBuilder($this->dataConverter, $this->compression, $this->calculator, $this->options)) ->build($this->column, $this->rows, $this->statistics); + $statistics = (new StatisticsBuilder($this->dataConverter))->build($this->column, $this->statistics); + $this->statistics->reset(); return new ColumnChunkContainer( @@ -51,7 +53,7 @@ public function flush(int $fileOffset) : ColumnChunkContainer dictionaryPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset : null, dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalCompressedSize() : $fileOffset, indexPageOffset: null, - statistics: null + statistics: $statistics ) ); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php index 9a2f4a1de..e6b819328 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkStatistics.php @@ -4,6 +4,7 @@ use Flow\Parquet\Data\ObjectToString; use Flow\Parquet\Exception\RuntimeException; +use Flow\Parquet\ParquetFile\RowGroupBuilder\Statistics\Comparator; use Flow\Parquet\ParquetFile\Schema\ColumnPrimitiveType; use Flow\Parquet\ParquetFile\Schema\FlatColumn; use Flow\Parquet\ParquetFile\Schema\PhysicalType; @@ -12,6 +13,12 @@ final class ColumnChunkStatistics { private bool $columnIsString; + private Comparator $comparator; + + private mixed $max; + + private mixed $min; + private int $nullCount; private int $totalStringLength; @@ -26,6 +33,9 @@ public function __construct(private readonly FlatColumn $column) $this->valuesCount = 0; $this->totalStringLength = 0; $this->columnIsString = ColumnPrimitiveType::isString($this->column); + $this->min = null; + $this->max = null; + $this->comparator = new Comparator(); } public function add(string|int|float|null|array|bool|object $value) : void @@ -44,9 +54,26 @@ public function add(string|int|float|null|array|bool|object $value) : void if (\is_array($value)) { foreach ($value as $val) { + + if ($this->comparator->isLessThan($val, $this->min)) { + $this->min = $val; + } + + if ($this->comparator->isGreaterThan($val, $this->max)) { + $this->max = $val; + } + $this->values[] = \is_object($val) ? ObjectToString::toString($val) : $val; } } else { + if ($this->comparator->isLessThan($value, $this->min)) { + $this->min = $value; + } + + if ($this->comparator->isGreaterThan($value, $this->max)) { + $this->max = $value; + } + $this->values[] = \is_object($value) ? ObjectToString::toString($value) : $value; } @@ -83,6 +110,16 @@ public function distinctCount() : int return \count(\array_unique($this->values)); } + public function max() : mixed + { + return $this->max; + } + + public function min() : mixed + { + return $this->min; + } + public function notNullCount() : int { return $this->valuesCount - $this->nullCount; @@ -98,6 +135,8 @@ public function reset() : void $this->nullCount = 0; $this->valuesCount = 0; $this->totalStringLength = 0; + $this->min = null; + $this->max = null; $this->values = []; } @@ -133,6 +172,11 @@ public function uncompressedSize() : int throw new RuntimeException('Unknown column type'); } + public function values() : array + { + return $this->values; + } + public function valuesCount() : int { return $this->valuesCount; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/Statistics/Comparator.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/Statistics/Comparator.php new file mode 100644 index 000000000..b185b199a --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/Statistics/Comparator.php @@ -0,0 +1,52 @@ +toParquetType($value); + $nextValue = (new TimeConverter())->toParquetType($nextValue); + } + + return $value > $nextValue; + } + + public function isLessThan(mixed $value, mixed $nextValue) : bool + { + if ($value === null) { + return false; + } + + if ($nextValue === null) { + return true; + } + + if (\gettype($value) !== \gettype($nextValue)) { + throw new \RuntimeException(\sprintf('Cannot compare %s with %s', \gettype($value), \gettype($nextValue))); + } + + if ($value instanceof \DateInterval) { + $value = (new TimeConverter())->toParquetType($value); + $nextValue = (new TimeConverter())->toParquetType($nextValue); + } + + return $value < $nextValue; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/StatisticsBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/StatisticsBuilder.php new file mode 100644 index 000000000..eb742c1aa --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/StatisticsBuilder.php @@ -0,0 +1,35 @@ +dataConverter))->packValues($column, [$chunkStatistics->min()]); + (new PlainValuesPacker(new BinaryBufferWriter($maxBuffer), $this->dataConverter))->packValues($column, [$chunkStatistics->max()]); + + return new Statistics( + max: $maxBuffer, + min: $minBuffer, + nullCount: $chunkStatistics->nullCount(), + distinctCount: $chunkStatistics->distinctCount(), + maxValue: $maxBuffer, + minValue: $minBuffer, + ); + } +} diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php index a0689ff30..27cd631c6 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php @@ -234,7 +234,30 @@ public function test_writing_double_nullable_column() : void public function test_writing_enum_column() : void { - $this->markTestSkipped('Not implemented yet'); + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::enum('enum')); + + $enum = ['A', 'B', 'C', 'D']; + + $inputData = \array_merge(...\array_map(static function (int $i) use ($enum) : array { + return [ + [ + 'enum' => $enum[\random_int(0, 3)], + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + + $this->assertTrue(\file_exists($path)); + \unlink($path); } public function test_writing_float_column() : void diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php index cef602f3a..bb5eaddf1 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php @@ -158,6 +158,35 @@ public function test_writing_batch_to_not_writable_stream() : void \unlink($path); } + public function test_writing_column_statistics() : void + { + $writer = new Writer( + options: Options::default() + ->set(Option::WRITER_VERSION, 1) + ); + + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-v2-', true) . '.parquet'; + + $schema = Schema::with($column = FlatColumn::int32('int32')); + + $writer->write($path, $schema, \array_map( + static fn ($i) => ['int32' => $i], + \range(1, 100) + )); + + $statistics = (new Reader())->read($path)->metadata()->columnChunks()[0]->statistics(); + + $this->assertSame(1, $statistics->min($column)); + $this->assertSame(100, $statistics->max($column)); + $this->assertSame(1, $statistics->minValue($column)); + $this->assertSame(100, $statistics->maxValue($column)); + $this->assertSame(100, $statistics->distinctCount()); + $this->assertSame(0, $statistics->nullCount()); + + $this->assertFileExists($path); + \unlink($path); + } + public function test_writing_in_batches_to_file() : void { $writer = new Writer();