Skip to content

Commit

Permalink
Fixed bug in RLE/bitpacking hybrid algorithm (#640)
Browse files Browse the repository at this point in the history
* Fixed bug in RLE/bitpacking hybrid algorithm

* CS Fixes
  • Loading branch information
norberttech authored Oct 24, 2023
1 parent 3ea4acc commit d1e85e6
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 93 deletions.
1 change: 1 addition & 0 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<directory>src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration</directory>
<directory>src/core/etl/tests/Flow/ETL/Tests/Integration</directory>
<directory>src/lib/parquet/tests/Flow/Parquet/Tests/Integration</directory>
<directory>src/lib/dremel/tests/Flow/Dremel/Tests/Integration</directory>
<directory>src/lib/snappy/tests/Flow/Snappy/Tests/Integration</directory>
</testsuite>
<testsuite name="integration-services">
Expand Down
26 changes: 26 additions & 0 deletions src/lib/dremel/tests/Flow/Dremel/Tests/Integration/DremelTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php declare(strict_types=1);

namespace Flow\Dremel\Tests\Integration;

use Flow\Dremel\Dremel;
use PHPUnit\Framework\TestCase;

final class DremelTest extends TestCase
{
public function test_dremel_shredding_and_assembling() : void
{
$repetitions = [0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1];
$definitions = [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3];
$values = [['Suscipit officiis dolorum ea omnis est id magnam.', 'Ea rerum saepe a minima non iusto.'], ['Id dolor et et repellendus.', 'Cumque facilis aut quos et.', 'Sit illum ipsam dolor voluptatem est.'], ['Commodi dicta rerum quas omnis sunt dolor.', 'Architecto sint corrupti nihil soluta nesciunt.', 'Accusamus libero aliquam rerum.'], ['Eum molestias reiciendis cumque ad animi.', 'Sunt ad magnam quas dolores possimus sint aut.', 'Quidem cupiditate doloremque aut esse non.', 'Consequatur nobis delectus aut.', 'Quo fuga fugiat nulla dolor non fugit dolorum.', 'Voluptate ex culpa deleniti est eum qui.', 'Quia sunt quia ut consequatur et optio et.'], ['Aut soluta corrupti laborum qui.', 'Officia maiores natus voluptatem provident aut.', 'Voluptatem modi sequi molestiae aut molestiae.', 'Cumque qui voluptas quia.', 'Quis esse ut odio commodi quae.', 'Voluptatem est accusantium est et eum.', 'Ratione et ut fuga qui atque sed et.', 'Et aut ut quidem provident excepturi placeat.'], ['Rerum molestiae dicta libero dolorem.', 'Expedita fuga sequi a maiores quasi.', 'Nesciunt qui similique et.', 'Architecto perferendis qui sequi sint qui nemo.', 'Sequi in atque tenetur.', 'Voluptatem quod et placeat cupiditate.', 'Qui qui laborum consequatur quos cum totam.', 'Saepe sit quae eos accusamus.', 'Qui illum dolor vel consequuntur nihil.'], ['Vel tenetur velit quas.', 'Natus autem ab beatae nihil recusandae.', 'Ut quasi voluptatum qui dolore ut.', 'Ducimus et minima voluptatem cum sint non.', 'Rerum tenetur sunt quidem est et modi et.', 'Vitae sit eum eius rerum possimus.', 'Eos ipsa est a aliquid impedit doloremque nisi.', 'Aut illum quam sit asperiores.'], ['Repellat dolore sit ad amet sed repudiandae.', 'Quam nemo cum quo culpa.', 'Omnis sed minima vero.', 'Esse qui quo cumque earum eius nulla.', 'Sed in adipisci quas fuga.', 'Dolor est aliquid tempora.', 'Ut expedita id suscipit ut voluptatem.'], ['Sint ipsa et autem ut id vitae.', 'Sapiente ut ab qui.', 'Ullam sit numquam qui perferendis aut.'], ['Qui illum id nam quia quibusdam vero.', 'Quas laboriosam perferendis temporibus vero.', 'Numquam quas deserunt est et eius.', 'Voluptas debitis incidunt ea minus.', 'Pariatur ipsa ipsa sequi ut est dolor adipisci.']];

$dremel = new Dremel();
$shredded = $dremel->shred($values, \max($definitions));

$this->assertSame($repetitions, $shredded->repetitions);
$this->assertSame($definitions, $shredded->definitions);

$assembledValues = \iterator_to_array($dremel->assemble($shredded->repetitions, $shredded->definitions, $shredded->values));

$this->assertSame($values, $assembledValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public function readFloats(int $total) : array
$floats = [];

foreach ($floatBytes as $bytes) {
$floats[] = \unpack($this->byteOrder === ByteOrder::LITTLE_ENDIAN ? 'g' : 'G', \pack('C*', ...$bytes))[1];
$floats[] = \round(\unpack($this->byteOrder === ByteOrder::LITTLE_ENDIAN ? 'g' : 'G', \pack('C*', ...$bytes))[1], 7);
}

return $floats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public function writeDoubles(array $doubles) : void

foreach ($doubles as $double) {
$this->buffer .= \pack($format, $double);
$this->length->addBytes(8);
}
$this->length->addBytes(\count($doubles) * 8);
}

public function writeFloats(array $floats) : void
Expand All @@ -116,8 +116,8 @@ public function writeFloats(array $floats) : void

foreach ($floats as $float) {
$this->buffer .= \pack($format, $float);
$this->length->addBytes(4); // A float is 4 bytes
}
$this->length->addBytes(\count($floats) * 4);
}

public function writeInts32(array $ints) : void
Expand Down
10 changes: 10 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Consts.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php declare(strict_types=1);

namespace Flow\Parquet;

final class Consts
{
public const PHP_INT32_MAX = 2147483647;

public const PHP_INT64_MAX = 9223372036854775807;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ final class BytesConverter
public static function binToHex(string $binaryData, int $limit = null, string $glue = ' ') : string
{
if ($limit === null) {
return \implode($glue, \str_split(\bin2hex($binaryData), 2));
return \implode($glue, \str_split(\strtoupper(\bin2hex($binaryData)), 2));
}

return \implode($glue, \array_slice(\str_split(\bin2hex($binaryData), 2), 0, $limit));
return \implode($glue, \array_slice(\str_split(\strtoupper(\bin2hex($binaryData)), 2), 0, $limit));
}

public static function intToBin(int $number, int $bits = 32, int $bitsPerGroup = 4) : string
Expand Down
16 changes: 0 additions & 16 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/DataBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,6 @@ private function enrichData(mixed $value, Column $column) : mixed
return null;
}

if ($column->type() === PhysicalType::FLOAT) {
if (\is_float($value)) {
return \round($value, 7);
}

if (\is_array($value)) {
$enriched = [];

foreach ($value as $val) {
$enriched[] = \round($val, 7);
}

return $enriched;
}
}

if ($column->type() === PhysicalType::INT96 && $this->options->get(Option::INT_96_AS_DATETIME)) {
if (\is_array($value) && \count($value) && !\is_array($value[0])) {
return $this->nanoToDateTimeImmutable($value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,35 @@ public function encodeHybrid(BinaryWriter $writer, array $values) : void

$previousValue = null;

foreach ($values as $value) {
foreach ($values as $i => $value) {
if ($previousValue === null) {
$previousValue = $value;
$rleBuffer[] = $value;

continue;
}

// we always bit-pack a multiple of 8 values at a time, so we only store the number of values / 8
// we always bit-pack a multiple of 8 values at a time, so we only store the number of "values / 8"
if (\count($bitPackedBuffer) > 0 && \count($bitPackedBuffer) < 8) {
$bitPackedBuffer[] = $value;

continue;
}

if (\count($bitPackedBuffer) % 8 === 0) {
$this->encodeBitPacked($writer, $bitWidth, $bitPackedBuffer);
$bitPackedBuffer = [];
}

if ($previousValue === $value) {
$rleBuffer[] = $value;
} else {
if (\count($rleBuffer) >= 8) {
if (\count($bitPackedBuffer)) {
$this->encodeBitPacked($writer, $bitWidth, $bitPackedBuffer);
$bitPackedBuffer = [];
}

$this->encodeRLE($writer, $bitWidth, $rleBuffer);
$rleBuffer = [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ public function flush(int $fileOffset) : RowGroupContainer
$chunkContainers = [];

foreach ($this->chunkBuilders as $chunkBuilder) {
$chunkContainer = $chunkBuilder->flush($fileOffset);
$fileOffset += \strlen($chunkContainer->binaryBuffer);
$chunkContainers[] = $chunkContainer;
foreach ($chunkBuilder->flush($fileOffset) as $chunkContainer) {
$fileOffset += \strlen($chunkContainer->binaryBuffer);
$chunkContainers[] = $chunkContainer;
}
}

$buffer = '';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,43 @@ public function addRow(mixed $data) : void

/**
* @psalm-suppress PossiblyNullArgument
*
* @return array<ColumnChunkContainer>
*/
public function flush(int $fileOffset) : ColumnChunkContainer
public function flush(int $fileOffset) : array
{
$pageContainer = (new DataPagesBuilder($this->data))->build($this->column);
$offset = $fileOffset;
$columnChunkContainers = [];
$previousChunkData = null;

foreach (\array_chunk($this->data, 1000) as $dataChunk) {
$pageContainer = (new DataPagesBuilder($dataChunk))->build($this->column);

$columnChunkContainers[] = new ColumnChunkContainer(
$pageContainer->pageHeaderBuffer . $pageContainer->dataBuffer,
new ColumnChunk(
$this->column->type(),
Compressions::UNCOMPRESSED,
/** @phpstan-ignore-next-line */
$pageContainer->pageHeader->dataValuesCount(),
$offset,
$this->column->path(),
[
Encodings::PLAIN,
],
\strlen($pageContainer->dataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
\strlen($pageContainer->dataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
dictionaryPageOffset: null,
dataPageOffset: $offset,
indexPageOffset: null,
)
);

$offset += \strlen($pageContainer->pageHeaderBuffer) + \strlen($pageContainer->dataBuffer);
}

$this->data = [];

return new ColumnChunkContainer(
$pageContainer->pageHeaderBuffer . $pageContainer->dataBuffer,
new ColumnChunk(
$this->column->type(),
Compressions::UNCOMPRESSED,
/** @phpstan-ignore-next-line */
$pageContainer->pageHeader->dataValuesCount(),
$fileOffset,
$this->column->path(),
[
Encodings::PLAIN,
],
\strlen($pageContainer->dataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
\strlen($pageContainer->dataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
dictionaryPageOffset: null,
dataPageOffset: $fileOffset,
indexPageOffset: null,
)
);
return $columnChunkContainers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function __construct(private readonly array $rows)
public function build(FlatColumn $column) : DataPageContainer
{
$shredded = (new Dremel())->shred($this->rows, $column->maxDefinitionsLevel());

$rleBitPackedHybrid = new RLEBitPackedHybrid();

$pageBuffer = '';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ public function test_writing_and_reading_decimals(array $decimals, int $precisio
);
}

public function test_writing_and_reading_floats() : void
{
$buffer = '';
$floats = [1.1, 2.2, 3.3, 4.4, 9.1];

(new BinaryBufferWriter($buffer))->writeFloats($floats);
$this->assertEqualsWithDelta(
$floats,
(new BinaryBufferReader($buffer))->readFloats(\count($floats)),
0.000001,
);
}

public function test_writing_and_reading_strings() : void
{
$buffer = '';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Flow\Parquet\Tests\Integration\IO;

use Faker\Factory;
use Flow\Parquet\Consts;
use Flow\Parquet\ParquetFile\Schema;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\ListElement;
Expand Down Expand Up @@ -34,59 +36,45 @@ public function test_writer() : void
NestedColumn::list('list_of_strings', ListElement::string())
);

$writer->write($path, $schema, $inputData = [
[
'boolean' => true,
'int32' => 32,
'int64' => 64,
'float' => 2.2,
'double' => 2.2,
'decimal' => 10.24,
'string' => 'string',
'date' => (new \DateTimeImmutable())->setTime(0, 0),
'datetime' => new \DateTimeImmutable(),
'list_of_datetimes' => [
new \DateTimeImmutable('+1 second'),
new \DateTimeImmutable('+2 second'),
new \DateTimeImmutable('+3 second'),
],
'map_of_ints' => [
'a' => 0,
'b' => 1,
'c' => 2,
],
'list_of_strings' => ['string_00_00'],
],
[
'boolean' => false,
'int32' => 150,
'int64' => 64,
'float' => 2.2,
'double' => 2.2,
'decimal' => 10.24,
'string' => 'string',
'date' => (new \DateTimeImmutable())->setTime(0, 0),
'datetime' => new \DateTimeImmutable(),
'list_of_datetimes' => [
new \DateTimeImmutable('+1 second'),
new \DateTimeImmutable('+2 second'),
new \DateTimeImmutable('+3 second'),
],
'map_of_ints' => [
'd' => 3,
'e' => 4,
'f' => 5,
$faker = Factory::create();

$inputData = \array_map(static function (int $i) use ($faker) : array {
return [
[
'boolean' => $faker->boolean,
'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'int64' => $faker->numberBetween(0, PHP_INT_MAX),
'float' => 10.25,
'double' => $faker->randomFloat(),
'decimal' => \round($faker->randomFloat(5), 2),
'string' => $faker->text(50),
'date' => \DateTimeImmutable::createFromMutable($faker->dateTime)->setTime(0, 0, 0, 0),
'datetime' => \DateTimeImmutable::createFromMutable($faker->dateTime),
'list_of_datetimes' => [
\DateTimeImmutable::createFromMutable($faker->dateTime),
\DateTimeImmutable::createFromMutable($faker->dateTime),
\DateTimeImmutable::createFromMutable($faker->dateTime),
],
'map_of_ints' => [
'a' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'b' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
'c' => $faker->numberBetween(0, Consts::PHP_INT32_MAX),
],
'list_of_strings' => \array_map(static fn (int $i) => $faker->text(50), \range(0, \random_int(1, 10))),
],
'list_of_strings' => ['string_01_00', 'string_01_01', 'string_01_02'],
],
]);
];
}, \range(1, 100));

$inputData = \array_merge(...$inputData);

$writer->write($path, $schema, $inputData);

$reader = new Reader();
$file = $reader->read($path);

$this->assertEquals(
$inputData,
\iterator_to_array($file->values())
\iterator_to_array($file->values()),
);
}
}
Loading

0 comments on commit d1e85e6

Please sign in to comment.