Skip to content

Commit

Permalink
Added DataPageV2 statistics (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Nov 6, 2023
1 parent 8a7bbbd commit e725f1e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Flow\Parquet\ParquetFile\Page\Header;

use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\RowGroup\StatisticsReader;
use Flow\Parquet\ParquetFile\Statistics;

/**
* @psalm-suppress RedundantConditionGivenDocblockType
Expand All @@ -17,7 +19,8 @@ public function __construct(
private readonly Encodings $encoding,
private readonly int $definitionsByteLength,
private readonly int $repetitionsByteLength,
private readonly ?bool $isCompressed
private readonly ?bool $isCompressed,
private readonly ?Statistics $statistics
) {
}

Expand All @@ -32,7 +35,9 @@ public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift)
$thrift->definition_levels_byte_length,
$thrift->repetition_levels_byte_length,
/** @phpstan-ignore-next-line */
$thrift->is_compressed ?? null
$thrift->is_compressed ?? null,
/** @phpstan-ignore-next-line */
$thrift->statistics ? Statistics::fromThrift($thrift->statistics) : null
);
}

Expand All @@ -51,6 +56,15 @@ public function repetitionsByteLength() : int
return $this->repetitionsByteLength;
}

public function statistics() : ?StatisticsReader
{
if ($this->statistics === null) {
return null;
}

return new StatisticsReader($this->statistics);
}

public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2
{
return new \Flow\Parquet\Thrift\DataPageHeaderV2([
Expand All @@ -61,6 +75,7 @@ public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2
'repetition_levels_byte_length' => $this->repetitionsByteLength,
'encoding' => $this->encoding->value,
'is_compressed' => $this->isCompressed,
'statistics' => $this->statistics?->toThrift(),
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ private function buildDataPage(array $rows, FlatColumn $column, ?array $dictiona

private function buildDataPageV2(array $rows, FlatColumn $column, ?array $dictionary, ?array $indices) : PageContainer
{
$statistics = new DataPageV2Statistics();

foreach ($rows as $row) {
$statistics->add($row);
}

$statistics = (new StatisticsBuilder($this->dataConverter))->build($column, $statistics);

$shredded = (new Dremel())->shred($rows, $column->maxDefinitionsLevel());

$rleBitPackedHybrid = new RLEBitPackedHybrid();
Expand Down Expand Up @@ -138,6 +146,7 @@ private function buildDataPageV2(array $rows, FlatColumn $column, ?array $dictio
definitionsByteLength: $definitionsLength,
repetitionsByteLength: $repetitionsLength,
isCompressed: !($this->compression === Compressions::UNCOMPRESSED),
statistics: $statistics,
),
dictionaryPageHeader: null,
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder;

use Flow\Parquet\Data\ObjectToString;
use Flow\Parquet\ParquetFile\RowGroupBuilder\Statistics\Comparator;

final class DataPageV2Statistics
{
private Comparator $comparator;

private mixed $max;

private mixed $min;

private int $nullCount;

private array $values = [];

private int $valuesCount;

public function __construct()
{
$this->nullCount = 0;
$this->valuesCount = 0;
$this->min = null;
$this->max = null;
$this->comparator = new Comparator();
}

public function add(string|int|float|null|array|bool|object $value) : void
{
if (\is_array($value)) {
$this->valuesCount += \count($value);
} else {
$this->valuesCount++;
}

if ($value === null) {
$this->nullCount++;

return;
}

if (\is_array($value)) {
foreach ($value as $val) {

if ($this->comparator->isLessThan($val, $this->min)) {
$this->min = $val;
}

if ($this->comparator->isGreaterThan($val, $this->max)) {
$this->max = $val;
}

$this->values[] = \is_object($val) ? ObjectToString::toString($val) : $val;
}
} else {
if ($this->comparator->isLessThan($value, $this->min)) {
$this->min = $value;
}

if ($this->comparator->isGreaterThan($value, $this->max)) {
$this->max = $value;
}

$this->values[] = \is_object($value) ? ObjectToString::toString($value) : $value;
}
}

public function distinctCount() : int
{
if ([] === $this->values) {
return 0;
}

return \count(\array_unique($this->values));
}

public function max() : mixed
{
return $this->max;
}

public function min() : mixed
{
return $this->min;
}

public function nullCount() : int
{
return $this->nullCount;
}

public function values() : array
{
return $this->values;
}

public function valuesCount() : int
{
return $this->valuesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder;

use Flow\Parquet\BinaryWriter\BinaryBufferWriter;
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\ParquetFile\Data\PlainValuesPacker;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Statistics;

final class StatisticsBuilder
{
public function __construct(private readonly DataConverter $dataConverter)
{

}

public function build(FlatColumn $column, DataPageV2Statistics $chunkStatistics) : Statistics
{
$minBuffer = '';
$maxBuffer = '';

(new PlainValuesPacker(new BinaryBufferWriter($minBuffer), $this->dataConverter))->packValues($column, [$chunkStatistics->min()]);
(new PlainValuesPacker(new BinaryBufferWriter($maxBuffer), $this->dataConverter))->packValues($column, [$chunkStatistics->max()]);

return new Statistics(
max: $maxBuffer,
min: $minBuffer,
nullCount: $chunkStatistics->nullCount(),
distinctCount: $chunkStatistics->distinctCount(),
maxValue: $maxBuffer,
minValue: $minBuffer,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,36 @@ public function test_writing_column_statistics() : void
\unlink($path);
}

public function test_writing_data_page_v2_statistics() : void
{
$writer = new Writer(
options: Options::default()
->set(Option::WRITER_VERSION, 2)
);

$path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-v2-', true) . '.parquet';

$schema = Schema::with($column = FlatColumn::int32('int32'));

$writer->write($path, $schema, \array_map(
static fn ($i) => ['int32' => $i],
\range(1, 100)
));

foreach ((new Reader())->read($path)->pageHeaders() as $pageHeader) {
$statistics = $pageHeader->pageHeader->dataPageHeaderV2()->statistics();

$this->assertSame(1, $statistics->min($column));
$this->assertSame(100, $statistics->max($column));
$this->assertSame(1, $statistics->minValue($column));
$this->assertSame(100, $statistics->maxValue($column));
$this->assertSame(100, $statistics->distinctCount());
$this->assertSame(0, $statistics->nullCount());
}

\unlink($path);
}

public function test_writing_in_batches_to_file() : void
{
$writer = new Writer();
Expand Down

0 comments on commit e725f1e

Please sign in to comment.