From e725f1ecb34ee2741496709af6966b136b984d61 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Mon, 6 Nov 2023 23:44:43 +0100 Subject: [PATCH] Added DataPageV2 statistics (#755) --- .../Page/Header/DataPageHeaderV2.php | 19 +++- .../PageBuilder/DataPageBuilder.php | 9 ++ .../PageBuilder/DataPageV2Statistics.php | 104 ++++++++++++++++++ .../PageBuilder/StatisticsBuilder.php | 35 ++++++ .../Tests/Integration/IO/WriterTest.php | 30 +++++ 5 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageV2Statistics.php create mode 100644 src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/StatisticsBuilder.php diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php index be1512a1a..ebef96c38 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php @@ -3,6 +3,8 @@ namespace Flow\Parquet\ParquetFile\Page\Header; use Flow\Parquet\ParquetFile\Encodings; +use Flow\Parquet\ParquetFile\RowGroup\StatisticsReader; +use Flow\Parquet\ParquetFile\Statistics; /** * @psalm-suppress RedundantConditionGivenDocblockType @@ -17,7 +19,8 @@ public function __construct( private readonly Encodings $encoding, private readonly int $definitionsByteLength, private readonly int $repetitionsByteLength, - private readonly ?bool $isCompressed + private readonly ?bool $isCompressed, + private readonly ?Statistics $statistics ) { } @@ -32,7 +35,9 @@ public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift) $thrift->definition_levels_byte_length, $thrift->repetition_levels_byte_length, /** @phpstan-ignore-next-line */ - $thrift->is_compressed ?? null + $thrift->is_compressed ?? null, + /** @phpstan-ignore-next-line */ + $thrift->statistics ? Statistics::fromThrift($thrift->statistics) : null ); } @@ -51,6 +56,15 @@ public function repetitionsByteLength() : int return $this->repetitionsByteLength; } + public function statistics() : ?StatisticsReader + { + if ($this->statistics === null) { + return null; + } + + return new StatisticsReader($this->statistics); + } + public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2 { return new \Flow\Parquet\Thrift\DataPageHeaderV2([ @@ -61,6 +75,7 @@ public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2 'repetition_levels_byte_length' => $this->repetitionsByteLength, 'encoding' => $this->encoding->value, 'is_compressed' => $this->isCompressed, + 'statistics' => $this->statistics?->toThrift(), ]); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php index 69c7725e3..eb7a15470 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageBuilder.php @@ -94,6 +94,14 @@ private function buildDataPage(array $rows, FlatColumn $column, ?array $dictiona private function buildDataPageV2(array $rows, FlatColumn $column, ?array $dictionary, ?array $indices) : PageContainer { + $statistics = new DataPageV2Statistics(); + + foreach ($rows as $row) { + $statistics->add($row); + } + + $statistics = (new StatisticsBuilder($this->dataConverter))->build($column, $statistics); + $shredded = (new Dremel())->shred($rows, $column->maxDefinitionsLevel()); $rleBitPackedHybrid = new RLEBitPackedHybrid(); @@ -138,6 +146,7 @@ private function buildDataPageV2(array $rows, FlatColumn $column, ?array $dictio definitionsByteLength: $definitionsLength, repetitionsByteLength: $repetitionsLength, isCompressed: !($this->compression === Compressions::UNCOMPRESSED), + statistics: $statistics, ), dictionaryPageHeader: null, ); diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageV2Statistics.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageV2Statistics.php new file mode 100644 index 000000000..8bc1a6d09 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DataPageV2Statistics.php @@ -0,0 +1,104 @@ +nullCount = 0; + $this->valuesCount = 0; + $this->min = null; + $this->max = null; + $this->comparator = new Comparator(); + } + + public function add(string|int|float|null|array|bool|object $value) : void + { + if (\is_array($value)) { + $this->valuesCount += \count($value); + } else { + $this->valuesCount++; + } + + if ($value === null) { + $this->nullCount++; + + return; + } + + if (\is_array($value)) { + foreach ($value as $val) { + + if ($this->comparator->isLessThan($val, $this->min)) { + $this->min = $val; + } + + if ($this->comparator->isGreaterThan($val, $this->max)) { + $this->max = $val; + } + + $this->values[] = \is_object($val) ? ObjectToString::toString($val) : $val; + } + } else { + if ($this->comparator->isLessThan($value, $this->min)) { + $this->min = $value; + } + + if ($this->comparator->isGreaterThan($value, $this->max)) { + $this->max = $value; + } + + $this->values[] = \is_object($value) ? ObjectToString::toString($value) : $value; + } + } + + public function distinctCount() : int + { + if ([] === $this->values) { + return 0; + } + + return \count(\array_unique($this->values)); + } + + public function max() : mixed + { + return $this->max; + } + + public function min() : mixed + { + return $this->min; + } + + public function nullCount() : int + { + return $this->nullCount; + } + + public function values() : array + { + return $this->values; + } + + public function valuesCount() : int + { + return $this->valuesCount; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/StatisticsBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/StatisticsBuilder.php new file mode 100644 index 000000000..17967ccb0 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/StatisticsBuilder.php @@ -0,0 +1,35 @@ +dataConverter))->packValues($column, [$chunkStatistics->min()]); + (new PlainValuesPacker(new BinaryBufferWriter($maxBuffer), $this->dataConverter))->packValues($column, [$chunkStatistics->max()]); + + return new Statistics( + max: $maxBuffer, + min: $minBuffer, + nullCount: $chunkStatistics->nullCount(), + distinctCount: $chunkStatistics->distinctCount(), + maxValue: $maxBuffer, + minValue: $minBuffer, + ); + } +} diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php index bb5eaddf1..084493d5d 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php @@ -187,6 +187,36 @@ public function test_writing_column_statistics() : void \unlink($path); } + public function test_writing_data_page_v2_statistics() : void + { + $writer = new Writer( + options: Options::default() + ->set(Option::WRITER_VERSION, 2) + ); + + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-v2-', true) . '.parquet'; + + $schema = Schema::with($column = FlatColumn::int32('int32')); + + $writer->write($path, $schema, \array_map( + static fn ($i) => ['int32' => $i], + \range(1, 100) + )); + + foreach ((new Reader())->read($path)->pageHeaders() as $pageHeader) { + $statistics = $pageHeader->pageHeader->dataPageHeaderV2()->statistics(); + + $this->assertSame(1, $statistics->min($column)); + $this->assertSame(100, $statistics->max($column)); + $this->assertSame(1, $statistics->minValue($column)); + $this->assertSame(100, $statistics->maxValue($column)); + $this->assertSame(100, $statistics->distinctCount()); + $this->assertSame(0, $statistics->nullCount()); + } + + \unlink($path); + } + public function test_writing_in_batches_to_file() : void { $writer = new Writer();