Skip to content

Commit

Permalink
Simplified creating schema for parquet files (#621)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Oct 21, 2023
1 parent 4028b21 commit 924e2d5
Show file tree
Hide file tree
Showing 23 changed files with 624 additions and 81 deletions.
2 changes: 1 addition & 1 deletion src/lib/parquet/resources/python/generators/lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def generate_list_nested():
os.remove(parquet_file)

# Write the PyArrow Table to a Parquet file
with pq.ParquetWriter(parquet_file, schema, compression='GZIP') as writer:
with pq.ParquetWriter(parquet_file, schema, compression='SNAPPY') as writer:
writer.write_table(table)

pd.set_option('display.max_columns', None) # Show all columns
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parquet/resources/python/generators/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def generate_map_of_struct_of_structs(n_rows):
os.remove(parquet_file)

# Write the PyArrow Table to a Parquet file
with pq.ParquetWriter(parquet_file, schema, compression='GZIP') as writer:
with pq.ParquetWriter(parquet_file, schema, compression='SNAPPY') as writer:
writer.write_table(table)

# Show the first few rows of the DataFrame for verification
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parquet/resources/python/generators/orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@
table = pa.table(df, schema=schema)

# Write out as Parquet file with Snappy compression
pq.write_table(table, 'output/orders.parquet', compression='gzip')
pq.write_table(table, 'output/orders.parquet', compression='SNAPPY')
22 changes: 21 additions & 1 deletion src/lib/parquet/resources/python/generators/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import Enum
import pyarrow as pa
import pyarrow.parquet as pq
from decimal import Decimal


# Number of rows to generate
n_rows = 100
Expand All @@ -27,6 +29,9 @@ class Color(Enum):
time_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) for i in range(n_rows)], dtype='object')
uuid_col = pd.Series([str(uuid.uuid4()) for _ in range(n_rows)], dtype='string')
enum_col = pd.Series([random.choice(list(Color)).name for _ in range(n_rows)], dtype='string')
float_col = pd.Series([random.uniform(0, 100) for _ in range(n_rows)], dtype='float32')
double_col = pd.Series([random.uniform(0, 100) for _ in range(n_rows)], dtype='float64')
decimal_col = pd.Series([Decimal(str(round(random.uniform(0, 100), 2))) for i in range(n_rows)])

int32_nullable_col = pd.Series([i if i % 2 == 0 else None for i in range(n_rows)], dtype='Int32')
int64_nullable_col = pd.Series([i if i % 2 == 0 else None for i in range(n_rows)], dtype='Int64')
Expand All @@ -38,6 +43,9 @@ class Color(Enum):
time_nullable_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) if i % 2 == 0 else None for i in range(n_rows)], dtype='object')
uuid_nullable_col = pd.Series([str(uuid.uuid4()) if i % 2 == 0 else None for i in range(n_rows)], dtype='string')
enum_nullable_col = pd.Series([random.choice(list(Color)).name if i % 2 == 0 else None for i in range(n_rows)], dtype='string')
float_nullable_col = pd.Series([random.uniform(0, 100) if i % 2 == 0 else None for i in range(n_rows)], dtype='float32')
double_nullable_col = pd.Series([random.uniform(0, 100) if i % 2 == 0 else None for i in range(n_rows)], dtype='float64')
decimal_nullable_col = pd.Series([Decimal(str(round(random.uniform(0, 100), 2))) if i % 2 == 0 else None for i in range(n_rows)])

# Creating the DataFrame with only the new column
df_nested_list = pd.DataFrame({
Expand All @@ -61,6 +69,12 @@ class Color(Enum):
'uuid_nullable': uuid_nullable_col,
'enum': enum_col,
'enum_nullable': enum_nullable_col,
'float': float_col,
'float_nullable': float_nullable_col,
'double': double_col,
'double_nullable': double_nullable_col,
'decimal': decimal_col,
'decimal_nullable': decimal_nullable_col,
})

# Define the schema
Expand All @@ -85,6 +99,12 @@ class Color(Enum):
('uuid_nullable', pa.string()),
('enum', pa.string()),
('enum_nullable', pa.string()),
('float', pa.float32()),
('float_nullable', pa.float32()),
('double', pa.float64()),
('double_nullable', pa.float64()),
('decimal', pa.decimal128(10, 2)),
('decimal_nullable', pa.decimal128(10, 2)),
])

# Create a PyArrow Table
Expand All @@ -98,7 +118,7 @@ class Color(Enum):
os.remove(parquet_file)

# Write the PyArrow Table to a Parquet file
with pq.ParquetWriter(parquet_file, schema, compression='GZIP') as writer:
with pq.ParquetWriter(parquet_file, schema, compression='SNAPPY') as writer:
writer.write_table(table)

pd.set_option('display.max_columns', None) # Show all columns
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parquet/resources/python/generators/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def generate_struct_deeply_nested():
os.remove(parquet_file)

# Write the PyArrow Table to a Parquet file
with pq.ParquetWriter(parquet_file, schema, compression='GZIP') as writer:
with pq.ParquetWriter(parquet_file, schema, compression='SNAPPY') as writer:
writer.write_table(table)

pd.set_option('display.max_columns', None) # Show all columns
Expand Down
2 changes: 2 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/BinaryReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public function readByteArrays(int $total) : array;

public function readBytes(int $total) : Bytes;

public function readDecimals(int $total, int $byteLength) : array;

public function readDouble() : float;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ public function readBytes(int $total) : Bytes
return new Bytes($bytes);
}

public function readDecimals(int $total, int $byteLength) : array
{
$decimalBytes = \array_chunk($this->readBytes($byteLength * $total)->toArray(), $byteLength);

$decimals = [];

foreach ($decimalBytes as $bytes) {
$intValue = 0;

foreach ($bytes as $i => $byte) {
$shift = ($byteLength - 1 - $i) * 8;
$intValue |= ($byte << $shift);
}

$decimals[] = $intValue;
}

return $decimals;
}

public function readDouble() : float
{
$result = \unpack(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private function enrichData(mixed $value, Column $column) : mixed

if ($column->logicalType()?->name() === 'TIMESTAMP') {
/** @phpstan-ignore-next-line */
if ($column->logicalType()?->timestamp()?->nanos() && $this->options->get(Option::ROUND_NANOSECONDS) === false) {
if ($column->logicalType()?->timestampData()?->nanos() && $this->options->get(Option::ROUND_NANOSECONDS) === false) {
return $value;
}

Expand All @@ -114,9 +114,9 @@ private function enrichData(mixed $value, Column $column) : mixed
}

/** @phpstan-ignore-next-line */
$isMillis = (bool) $column->logicalType()?->timestamp()?->millis();
$isMillis = (bool) $column->logicalType()?->timestampData()?->millis();
/** @phpstan-ignore-next-line */
$isMicros = ($column->logicalType()?->timestamp()?->micros() || $column->logicalType()?->timestamp()?->nanos());
$isMicros = ($column->logicalType()?->timestampData()?->micros() || $column->logicalType()?->timestampData()?->nanos());

$convertValue = static function (int $val) use ($isMillis, $isMicros) : \DateTimeImmutable|int {
if ($isMillis) {
Expand Down
23 changes: 18 additions & 5 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/DataCoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function decodeData(
int $expectedValuesCount,
int $maxRepetitionsLevel,
int $maxDefinitionsLevel,
?int $typeLength = null,
?Dictionary $dictionary = null
) : ColumnData {
$reader = new BinaryBufferReader($buffer, $this->byteOrder);
Expand Down Expand Up @@ -85,6 +86,7 @@ public function decodeData(
$reader,
$expectedValuesCount - $nullsCount,
$logicalType,
$typeLength
)
);
}
Expand Down Expand Up @@ -121,13 +123,19 @@ public function decodeData(
throw new RuntimeException('Encoding ' . $encoding->name . ' not supported');
}

public function decodeDictionary(string $buffer, PhysicalType $physicalType, ?LogicalType $logicalType, Encodings $encoding, int $expectedValuesCount) : Dictionary
{
public function decodeDictionary(
string $buffer,
PhysicalType $physicalType,
?LogicalType $logicalType,
Encodings $encoding,
int $expectedValuesCount,
?int $typeLength = null
) : Dictionary {
$reader = new BinaryBufferReader($buffer, $this->byteOrder);
$this->debugLogDictionaryDecode($buffer, $encoding, $physicalType);

return new Dictionary(
$this->readPlainValues($physicalType, $reader, $expectedValuesCount, $logicalType)
$this->readPlainValues($physicalType, $reader, $expectedValuesCount, $logicalType, $typeLength)
);
}

Expand Down Expand Up @@ -215,8 +223,9 @@ private function debugLogRLEBitPackedHybridPre(int $bitWidth, int $expectedValue
$this->logger->debug('Decoding data with RLE Hybrid', ['bitWidth' => $bitWidth, 'expected_values_count' => $expectedValuesCount, 'reader_position' => ['bits' => $reader->position()->bits(), 'bytes' => $reader->position()->bytes()]]);
}

private function readPlainValues(PhysicalType $physicalType, BinaryBufferReader $reader, int $total, ?LogicalType $logicalType) : array
private function readPlainValues(PhysicalType $physicalType, BinaryBufferReader $reader, int $total, ?LogicalType $logicalType, ?int $typeLength) : array
{
/** @psalm-suppress PossiblyNullArgument */
return match ($physicalType) {
PhysicalType::INT32 => $reader->readInts32($total),
PhysicalType::INT64 => $reader->readInts64($total),
Expand All @@ -230,7 +239,11 @@ private function readPlainValues(PhysicalType $physicalType, BinaryBufferReader
false => $reader->readByteArrays($total)
}
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => throw new RuntimeException('FIXED_LEN_BYTE_ARRAY type is not yet supported'),//(array)\unpack('H*', $buffer),
PhysicalType::FIXED_LEN_BYTE_ARRAY => match ($logicalType?->name()) {
/** @phpstan-ignore-next-line */
LogicalType::DECIMAL => $reader->readDecimals($total, $typeLength),
default => throw new RuntimeException('Unsupported logical type ' . ($logicalType?->name() ?: 'null') . ' for FIXED_LEN_BYTE_ARRAY'),
},
PhysicalType::BOOLEAN => $reader->readBooleans($total),
};
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public function readData(PageHeader $pageHeader, Compressions $codec, ?Dictionar
$pageHeader->dataPageHeader()->valuesCount(),
$this->column->maxRepetitionsLevel(),
$this->column->maxDefinitionsLevel(),
$this->column->typeLength(),
$dictionary
);
}
Expand Down Expand Up @@ -74,6 +75,7 @@ public function readDictionary(PageHeader $pageHeader, Compressions $codec, $str
$this->column->logicalType(),
$pageHeader->dictionaryPageHeader()->encoding(),
$pageHeader->dictionaryPageHeader()->valuesCount(),
$this->column->typeLength(),
);
}
}
49 changes: 16 additions & 33 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
use Flow\Parquet\ParquetFile\Schema\Repetition;
use Flow\Parquet\Thrift\SchemaElement;

final class Schema
Expand All @@ -19,12 +18,20 @@ final class Schema
/**
* @param array<Column> $columns
*/
public function __construct(
private function __construct(
private readonly FlatColumn $schemaRoot,
private readonly array $columns
) {
}

public static function from(Column ...$columns) : self
{
return new self(
FlatColumn::boolean('schema'),
$columns
);
}

/**
* @param array<SchemaElement> $schemaElements
*/
Expand All @@ -35,7 +42,7 @@ public static function fromThrift(array $schemaElements) : self
}

return new self(
FlatColumn::fromThrift(\array_shift($schemaElements), 0, 0),
FlatColumn::fromThrift(\array_shift($schemaElements)),
self::processSchema($schemaElements)
);
}
Expand Down Expand Up @@ -136,62 +143,38 @@ private static function processSchema(
array $schemaElements,
int &$index = 0,
?string $rootPath = null,
int $maxDefinitionLevel = 0,
int $maxRepetitionLevel = 0,
int $childrenCount = null,
Column $parent = null
) : array {
$columns = [];

$processed_children = 0;
$processedChildren = 0;

while ($index < \count($schemaElements) && ($childrenCount === null || $processed_children < $childrenCount)) {
while ($index < \count($schemaElements) && ($childrenCount === null || $processedChildren < $childrenCount)) {
$elem = $schemaElements[$index];
$index++;

$currentMaxDefLevel = $maxDefinitionLevel;
$currentMaxRepLevel = $maxRepetitionLevel;

// Update maxDefinitionLevel and maxRepetitionLevel based on the repetition type of the current element
if ($elem->repetition_type !== Repetition::REQUIRED->value) {
$currentMaxDefLevel++;
}

if ($elem->repetition_type === Repetition::REPEATED->value) {
$currentMaxRepLevel++;
}

$root = FlatColumn::fromThrift($elem, $currentMaxDefLevel, $currentMaxRepLevel, $rootPath, $parent);
$root = FlatColumn::fromThrift($elem, $rootPath, $parent);

if ($elem->num_children > 0) {
$nestedColumn = new NestedColumn($root, [], $currentMaxDefLevel, $currentMaxRepLevel, $parent); // create NestedColumn with empty children first
$nestedColumn = new NestedColumn($root, [], $parent);
$children = self::processSchema(
$schemaElements,
$index,
$root->flatPath(),
$currentMaxDefLevel,
$currentMaxRepLevel,
$elem->num_children,
$nestedColumn // pass the NestedColumn as the parent
$nestedColumn
);

// now update the children of the NestedColumn
$nestedColumn->setChildren($children);

$nestedMaxDefLevel = $currentMaxDefLevel;
$nestedMaxRepLevel = $currentMaxRepLevel;

foreach ($children as $child) {
$nestedMaxDefLevel = \max($nestedMaxDefLevel, $child->maxDefinitionsLevel());
$nestedMaxRepLevel = \max($nestedMaxRepLevel, $child->maxRepetitionsLevel());
}

$columns[] = $nestedColumn; // use the updated NestedColumn
} else {
$columns[] = $root;
}

$processed_children++;
$processedChildren++;
}

return $columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,9 @@ public function parent() : ?self;

public function repetition() : ?Repetition;

public function setParent(NestedColumn $parent) : void;

public function type() : PhysicalType;

public function typeLength() : ?int;
}
Loading

0 comments on commit 924e2d5

Please sign in to comment.