Skip to content

Commit

Permalink
Boolean encoding and RLEBitPackedHybrid (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Nov 1, 2023
1 parent e4239e8 commit 167cc76
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public function encodeHybrid(BinaryWriter $writer, array $values) : void
continue;
}

if (\count($bitPackedBuffer) % 8 === 0) {
if (\count($bitPackedBuffer) && \count($bitPackedBuffer) % 8 === 0) {
$this->encodeBitPacked($writer, $bitWidth, $bitPackedBuffer);
$bitPackedBuffer = [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ final class DataPageHeader
{
public function __construct(
private readonly Encodings $encoding,
private readonly Encodings $repetitionLevelEncoding,
private readonly Encodings $definitionLevelEncoding,
private readonly int $valuesCount,
) {
}
Expand All @@ -20,22 +22,34 @@ public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeader $thrift) :
{
return new self(
Encodings::from($thrift->encoding),
Encodings::from($thrift->repetition_level_encoding),
Encodings::from($thrift->definition_level_encoding),
$thrift->num_values
);
}

public function definitionLevelEncoding() : Encodings
{
return $this->definitionLevelEncoding;
}

public function encoding() : Encodings
{
return $this->encoding;
}

public function repetitionLevelEncoding() : Encodings
{
return $this->repetitionLevelEncoding;
}

public function toThrift() : \Flow\Parquet\Thrift\DataPageHeader
{
return new \Flow\Parquet\Thrift\DataPageHeader([
'num_values' => $this->valuesCount,
'encoding' => $this->encoding->value,
'definition_level_encoding' => Encodings::RLE->value,
'repetition_level_encoding' => Encodings::RLE->value,
'definition_level_encoding' => $this->definitionLevelEncoding->value,
'repetition_level_encoding' => $this->repetitionLevelEncoding->value,
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null
\strlen($compressedBuffer),
\strlen($pageBuffer),
dataPageHeader: new DataPageHeader(
$dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN,
\count($shredded->definitions)
encoding: (\is_array($dictionary) && \is_array($indices)) ? Encodings::RLE_DICTIONARY : Encodings::PLAIN,
repetitionLevelEncoding: Encodings::RLE,
definitionLevelEncoding: Encodings::RLE,
valuesCount: \count($shredded->definitions)
),
dataPageHeaderV2: null,
dictionaryPageHeader: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer
dataPageHeader: null,
dataPageHeaderV2: null,
dictionaryPageHeader: new DictionaryPageHeader(
Encodings::PLAIN,
Encodings::PLAIN_DICTIONARY,
\count($dictionary->dictionary)
),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Flow\Parquet\ParquetFile\RowGroupBuilder;

use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\Type;

Expand Down Expand Up @@ -91,9 +92,19 @@ public function encodings() : array
}

foreach ($this->dataPageContainers as $pageContainer) {
$encodings[] = $pageContainer->pageHeader->encoding()->value;
$dataPageHeader = $pageContainer->pageHeader->dataPageHeader();

if ($dataPageHeader === null) {
throw new RuntimeException('Data page header not set for DataPage container');
}

$encodings[] = $dataPageHeader->repetitionLevelEncoding()->value;
$encodings[] = $dataPageHeader->definitionLevelEncoding()->value;
$encodings[] = $dataPageHeader->encoding()->value;
}

$encodings = \array_unique($encodings);

return \array_map(static fn (int $encoding) => Encodings::from($encoding), $encodings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DataPageBuilder;
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DictionaryPageBuilder;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\PhysicalType;

final class PagesBuilder
{
Expand All @@ -24,19 +25,21 @@ public function build(FlatColumn $column, array $rows, ColumnChunkStatistics $st
{
$containers = new PageContainers();

if ($statistics->cardinalityRation() <= $this->options->get(Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION)) {
$dictionaryPageContainer = (new DictionaryPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows);
if ($column->type() !== PhysicalType::BOOLEAN) {
if ($statistics->cardinalityRation() <= $this->options->get(Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION)) {
$dictionaryPageContainer = (new DictionaryPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows);

if ($dictionaryPageContainer->dataSize() <= $this->options->get(Option::DICTIONARY_PAGE_SIZE)) {
$containers->add($dictionaryPageContainer);
if ($dictionaryPageContainer->dataSize() <= $this->options->get(Option::DICTIONARY_PAGE_SIZE)) {
$containers->add($dictionaryPageContainer);

$containers->add(
(new DataPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows, $dictionaryPageContainer->dictionary, $dictionaryPageContainer->values)
);
$containers->add(
(new DataPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows, $dictionaryPageContainer->dictionary, $dictionaryPageContainer->values)
);

return $containers;
return $containers;
}
$dictionaryPageContainer = null;
}
$dictionaryPageContainer = null;
}

/* @phpstan-ignore-next-line */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\Schema;

enum ConvertedType : int
{
case BSON = 20;
case DATE = 6;
case DECIMAL = 5;
case ENUM = 4;
case INT_16 = 16;
case INT_32 = 17;
case INT_64 = 18;
case INT_8 = 15;
case INTERVAL = 21;
case JSON = 19;
case LIST = 3;
case MAP = 1;
case MAP_KEY_VALUE = 2;
case TIME_MICROS = 8;
case TIME_MILLIS = 7;
case TIMESTAMP_MICROS = 10;
case TIMESTAMP_MILLIS = 9;
case UINT_16 = 12;
case UINT_32 = 13;
case UINT_64 = 14;
case UINT_8 = 11;
case UTF8 = 0;
}
30 changes: 17 additions & 13 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ final class FlatColumn implements Column
public function __construct(
private readonly string $name,
private readonly PhysicalType $type,
private readonly ?ConvertedType $convertedType = null,
private readonly ?LogicalType $logicalType = null,
private readonly ?Repetition $repetition = Repetition::OPTIONAL,
private readonly ?int $precision = null,
Expand All @@ -31,12 +32,12 @@ public function __construct(

public static function boolean(string $name) : self
{
return new self($name, PhysicalType::BOOLEAN, null, Repetition::OPTIONAL);
return new self($name, PhysicalType::BOOLEAN, null, null, Repetition::OPTIONAL);
}

public static function date(string $name) : self
{
return new self($name, PhysicalType::INT32, new LogicalType(LogicalType::DATE), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT32, ConvertedType::DATE, new LogicalType(LogicalType::DATE), Repetition::OPTIONAL);
}

public static function dateTime(string $name, TimeUnit $timeUnit = TimeUnit::MICROSECONDS) : self
Expand All @@ -49,7 +50,7 @@ public static function dateTime(string $name, TimeUnit $timeUnit = TimeUnit::MIC
TimeUnit::MICROSECONDS => new Timestamp(false, false, true, false),
};

return new self($name, PhysicalType::INT64, new LogicalType(LogicalType::TIMESTAMP, $timestamp), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT64, ConvertedType::TIMESTAMP_MICROS, new LogicalType(LogicalType::TIMESTAMP, $timestamp), Repetition::OPTIONAL);
}

public static function decimal(string $name, int $precision = 10, int $scale = 2) : self
Expand All @@ -68,6 +69,7 @@ public static function decimal(string $name, int $precision = 10, int $scale = 2
return new self(
$name,
PhysicalType::FIXED_LEN_BYTE_ARRAY,
ConvertedType::DECIMAL,
LogicalType::decimal($scale, $precision),
Repetition::OPTIONAL,
$precision,
Expand All @@ -78,24 +80,25 @@ public static function decimal(string $name, int $precision = 10, int $scale = 2

public static function double(string $name) : self
{
return new self($name, PhysicalType::DOUBLE, null, Repetition::OPTIONAL);
return new self($name, PhysicalType::DOUBLE, null, null, Repetition::OPTIONAL);
}

public static function enum(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::ENUM, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function float(string $name) : self
{
return new self($name, PhysicalType::FLOAT, null, Repetition::OPTIONAL);
return new self($name, PhysicalType::FLOAT, null, null, Repetition::OPTIONAL);
}

public static function fromThrift(SchemaElement $thrift) : self
{
return new self(
$thrift->name,
PhysicalType::from($thrift->type),
$thrift->converted_type === null ? null : ConvertedType::from($thrift->converted_type),
$thrift->logicalType === null ? null : LogicalType::fromThrift($thrift->logicalType),
$thrift->repetition_type === null ? null : Repetition::from($thrift->repetition_type),
$thrift->precision,
Expand All @@ -106,7 +109,7 @@ public static function fromThrift(SchemaElement $thrift) : self

public static function int32(string $name) : self
{
return new self($name, PhysicalType::INT32, null, Repetition::OPTIONAL);
return new self($name, PhysicalType::INT32, ConvertedType::INT_32, null, Repetition::OPTIONAL);
}

public static function int64(string $name) : self
Expand All @@ -115,27 +118,27 @@ public static function int64(string $name) : self
throw new InvalidArgumentException('PHP_INT_MAX must be equal to ' . Consts::PHP_INT64_MAX . ' to support 64-bit timestamps.');
}

return new self($name, PhysicalType::INT64, null, Repetition::OPTIONAL);
return new self($name, PhysicalType::INT64, ConvertedType::INT_64, null, Repetition::OPTIONAL);
}

public static function json(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::JSON, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function string(string $name) : self
{
return new self($name, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($name, PhysicalType::BYTE_ARRAY, ConvertedType::UTF8, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function time(string $name) : self
{
return new self($name, PhysicalType::INT64, new LogicalType(LogicalType::TIME), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT64, ConvertedType::TIME_MICROS, new LogicalType(LogicalType::TIME), Repetition::OPTIONAL);
}

public static function uuid(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, null, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public function __debugInfo() : ?array
Expand Down Expand Up @@ -271,7 +274,7 @@ public function logicalType() : ?LogicalType

public function makeRequired() : self
{
return new self($this->name, $this->type, $this->logicalType, Repetition::REQUIRED, $this->precision, $this->scale, $this->typeLength);
return new self($this->name, $this->type, $this->convertedType, $this->logicalType, Repetition::REQUIRED, $this->precision, $this->scale, $this->typeLength);
}

public function maxDefinitionsLevel() : int
Expand Down Expand Up @@ -352,6 +355,7 @@ public function toThrift() : SchemaElement
return new SchemaElement([
'name' => $this->name,
'type' => $this->type->value,
'converted_type' => $this->convertedType?->value,
'repetition_type' => $this->repetition?->value,
'logicalType' => $this->logicalType?->toThrift(),
'precision' => $this->precision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public function __construct(
private readonly string $name,
private readonly ?Repetition $repetition,
private readonly array $children,
private readonly ?ConvertedType $convertedType = null,
private readonly ?LogicalType $logicalType = null,
public readonly bool $schemaRoot = false
) {
Expand Down Expand Up @@ -46,6 +47,7 @@ public static function fromThrift(SchemaElement $schemaElement, array $children)
$schemaElement->name,
$schemaElement->repetition_type ? Repetition::from($schemaElement->repetition_type) : null,
$children,
$schemaElement->converted_type ? ConvertedType::from($schemaElement->converted_type) : null,
/** @phpstan-ignore-next-line */
$schemaElement->logicalType ? LogicalType::fromThrift($schemaElement->logicalType) : null
);
Expand All @@ -63,6 +65,7 @@ public static function list(string $name, ListElement $element) : self
[$element->element]
),
],
ConvertedType::LIST,
new LogicalType(LogicalType::LIST)
);
}
Expand All @@ -82,6 +85,7 @@ public static function map(string $name, MapKey $key, MapValue $value) : self
],
),
],
ConvertedType::MAP,
new LogicalType(LogicalType::MAP)
);
}
Expand All @@ -91,7 +95,7 @@ public static function map(string $name, MapKey $key, MapValue $value) : self
*/
public static function schemaRoot(string $name, array $children) : self
{
return new self($name, Repetition::REQUIRED, $children, null, true);
return new self($name, Repetition::REQUIRED, $children, null, null, true);
}

/**
Expand Down Expand Up @@ -395,7 +399,7 @@ public function toThrift() : array
new SchemaElement([
'name' => $this->name(),
'num_children' => \count($this->children),
'converted_type' => null,
'converted_type' => $this->convertedType?->value,
'repetition_type' => $this->repetition()?->value,
'logicalType' => $this->logicalType()?->toThrift(),
]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ public function test_flushing_buffers_when_they_are_dividable_by_8() : void
);
}

public function test_plain_long_rle() : void
{
$values = \array_fill(0, 100, 1);

$buffer = '';
(new RLEBitPackedHybrid())->encodeHybrid(new BinaryBufferWriter($buffer), $values);

$this->assertSame(
$values,
(new RLEBitPackedHybrid())->decodeHybrid(
new BinaryBufferReader($buffer),
BitWidth::fromArray($values),
\count($values)
)
);
}

public function test_plain_rle_with_two_sequences() : void
{
$values = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1];
Expand Down
Loading

0 comments on commit 167cc76

Please sign in to comment.