Skip to content

Commit

Permalink
Parquet - DataPageV2 support (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Nov 6, 2023
1 parent fd47395 commit 37a27c9
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 36 deletions.
10 changes: 10 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,14 @@ enum Option
* Default value is 1000
*/
case ROW_GROUP_SIZE_CHECK_INTERVAL;

/**
* There are only two available versions of Parquet format: V1 and V2.
* This option is going to tell the writer which version should be used to create DataPages.
* - 1 will use DataPage
* - 2 will use DataPageV2.
*
* Default 1
*/
case WRITER_VERSION;
}
14 changes: 14 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Flow\Parquet;

use Flow\Parquet\Exception\InvalidArgumentException;

final class Options
{
/**
Expand All @@ -21,6 +23,7 @@ public function __construct()
Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
Option::WRITER_VERSION->name => 1,
];
}

Expand All @@ -34,6 +37,17 @@ public function get(Option $option) : bool|int|float
return $this->options[$option->name];
}

public function getInt(Option $option) : int
{
$value = $this->options[$option->name];

if (!\is_int($value)) {
throw new InvalidArgumentException("Option {$option->name} is not an integer, but: " . \gettype($value));
}

return $value;
}

public function set(Option $option, bool|int|float $value) : self
{
$this->options[$option->name] = $value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public function decodeRLE(BinaryReader $reader, int $bitWidth, int $intVar, int
$runLength = $intVar >> 1;

if ($runLength === 0) {
$output[] = 0;

return;
}

Expand Down
84 changes: 84 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/DataCoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,90 @@ public function decodeData(
throw new RuntimeException('Encoding ' . $encoding->name . ' not supported');
}

public function decodeDataV2(
string $buffer,
Encodings $encoding,
PhysicalType $physicalType,
?LogicalType $logicalType,
int $expectedValuesCount,
int $maxRepetitionsLevel,
int $maxDefinitionsLevel,
?int $typeLength = null,
?Dictionary $dictionary = null
) : ColumnData {
$reader = new BinaryBufferReader($buffer, $this->byteOrder);

$RLEBitPackedHybrid = new RLEBitPackedHybrid();

if ($maxRepetitionsLevel) {
$repetitions = $this->readRLEBitPackedHybrid(
$reader,
$RLEBitPackedHybrid,
BitWidth::calculate($maxRepetitionsLevel),
$expectedValuesCount,
);
} else {
$repetitions = [];
}

if ($maxDefinitionsLevel) {
$definitions = $this->readRLEBitPackedHybrid(
$reader,
$RLEBitPackedHybrid,
BitWidth::calculate($maxDefinitionsLevel),
$expectedValuesCount,
);
} else {
$definitions = [];
}

$nullsCount = \count($definitions) ? \count(\array_filter($definitions, static fn ($definition) => $definition !== $maxDefinitionsLevel)) : 0;

if ($encoding === Encodings::PLAIN) {
return new ColumnData(
$physicalType,
$logicalType,
$repetitions,
$definitions,
$this->readPlainValues(
$physicalType,
$reader,
$expectedValuesCount - $nullsCount,
$logicalType,
$typeLength
)
);
}

if ($encoding === Encodings::RLE_DICTIONARY || $encoding === Encodings::PLAIN_DICTIONARY) {
if (\count($definitions)) {
// while reading indices, there is no length at the beginning since length is simply a remaining length of the buffer
// however we need to know bitWidth which is the first value in the buffer after definitions
$bitWidth = $reader->readBytes(1)->toInt();
/** @var array<int> $indices */
$indices = $this->readRLEBitPackedHybrid(
$reader,
$RLEBitPackedHybrid,
$bitWidth,
$expectedValuesCount - $nullsCount,
);

/** @var array<mixed> $values */
$values = [];

foreach ($indices as $index) {
$values[] = $dictionary?->values[$index];
}
} else {
$values = [];
}

return new ColumnData($physicalType, $logicalType, $repetitions, $definitions, $values);
}

throw new RuntimeException('Encoding ' . $encoding->name . ' not supported');
}

public function decodeDictionary(
string $buffer,
PhysicalType $physicalType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,59 @@
final class DataPageHeaderV2
{
public function __construct(
private readonly Encodings $encoding,
private readonly int $valuesCount,
private readonly int $nullsCount,
private readonly int $rowsCount,
private readonly Encodings $encoding,
private readonly int $definitionsByteLength,
private readonly int $repetitionsByteLength,
private readonly ?bool $isCompressed
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift) : self
{
/** @psalm-suppress DocblockTypeContradiction */
return new self(
$thrift->num_values,
$thrift->num_nulls,
$thrift->num_rows,
Encodings::from($thrift->encoding),
$thrift->num_values
$thrift->definition_levels_byte_length,
$thrift->repetition_levels_byte_length,
/** @phpstan-ignore-next-line */
$thrift->is_compressed ?? null
);
}

public function definitionsByteLength() : int
{
return $this->definitionsByteLength;
}

public function encoding() : Encodings
{
return $this->encoding;
}

public function repetitionsByteLength() : int
{
return $this->repetitionsByteLength;
}

public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2
{
return new \Flow\Parquet\Thrift\DataPageHeaderV2([
'num_values' => $this->valuesCount,
'num_nulls' => $this->nullsCount,
'num_rows' => $this->rowsCount,
'definition_levels_byte_length' => $this->definitionsByteLength,
'repetition_levels_byte_length' => $this->repetitionsByteLength,
'encoding' => $this->encoding->value,
'is_compressed' => $this->isCompressed,
]);
}

public function valuesCount() : int
{
return $this->valuesCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ enum Type : int

public function isDataPage() : bool
{
return $this->value === self::DATA_PAGE->value;
return $this->value === self::DATA_PAGE->value || $this->value === self::DATA_PAGE_V2->value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public function toThrift() : \Flow\Parquet\Thrift\PageHeader
'uncompressed_page_size' => $this->uncompressedPageSize,
'crc' => null,
'data_page_header' => $this->dataPageHeader?->toThrift(),
'data_page_header_v2' => null,
'data_page_header_v2' => $this->dataPageHeaderV2?->toThrift(),
'dictionary_page_header' => $this->dictionaryPageHeader?->toThrift(),
'index_page_header' => null,
]);
Expand Down
67 changes: 52 additions & 15 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Page\ColumnData;
use Flow\Parquet\ParquetFile\Page\Dictionary;
use Flow\Parquet\ParquetFile\Page\Header\Type;
use Flow\Parquet\ParquetFile\Page\PageHeader;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

Expand All @@ -26,25 +27,61 @@ public function __construct(
*/
public function readData(PageHeader $pageHeader, Compressions $codec, ?Dictionary $dictionary, $stream) : ColumnData
{
return (new DataCoder($this->byteOrder))
->decodeData(
(new Codec($this->options))
switch ($pageHeader->type()) {
case Type::DATA_PAGE:
$data = (new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize()),
$codec
),
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeader()->encoding(),
$this->column->type(),
$this->column->logicalType(),
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeader()->valuesCount(),
$this->column->maxRepetitionsLevel(),
$this->column->maxDefinitionsLevel(),
$this->column->typeLength(),
$dictionary
);
);

return (new DataCoder($this->byteOrder))
->decodeData(
$data,
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeader()->encoding(),
$this->column->type(),
$this->column->logicalType(),
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeader()->valuesCount(),
$this->column->maxRepetitionsLevel(),
$this->column->maxDefinitionsLevel(),
$this->column->typeLength(),
$dictionary
);
case Type::DATA_PAGE_V2:

/* @phpstan-ignore-next-line */
$levelsLength = $pageHeader->dataPageHeaderV2()->repetitionsByteLength() + $pageHeader->dataPageHeaderV2()->definitionsByteLength();
/* @phpstan-ignore-next-line */
$levels = \fread($stream, $levelsLength);

$data = (new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize() - $levelsLength),
$codec
);

return (new DataCoder($this->byteOrder))
->decodeDataV2(
$levels . $data,
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeaderV2()->encoding(),
$this->column->type(),
$this->column->logicalType(),
/** @phpstan-ignore-next-line */
$pageHeader->dataPageHeaderV2()->valuesCount(),
$this->column->maxRepetitionsLevel(),
$this->column->maxDefinitionsLevel(),
$this->column->typeLength(),
$dictionary
);

default:
throw new RuntimeException("Unknown page header type '{$pageHeader->type()->name}'");
}
}

/**
Expand Down
Loading

0 comments on commit 37a27c9

Please sign in to comment.