Skip to content

Commit

Permalink
Simplified schema creation for Parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Oct 21, 2023
1 parent c70fd10 commit fb9c2ee
Show file tree
Hide file tree
Showing 21 changed files with 599 additions and 303 deletions.
2 changes: 1 addition & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public function metadata() : Metadata
/**
* @psalm-suppress PossiblyInvalidArgument
*/
public function readChunks(Column $column, int $limit = null) : \Generator
public function readChunks(FlatColumn $column, int $limit = null) : \Generator
{
$reader = new WholeChunk(
new DataBuilder($this->options, $this->logger),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
namespace Flow\Parquet\ParquetFile;

use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

interface ColumnChunkReader
{
/**
* @param resource $stream
*/
public function read(ColumnChunk $columnChunk, Column $column, $stream, int $limit = null) : \Generator;
public function read(ColumnChunk $columnChunk, FlatColumn $column, $stream, int $limit = null) : \Generator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Flow\Parquet\ParquetFile\PageReader;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ThriftStream\TPhpFileStream;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
Expand All @@ -28,7 +29,7 @@ public function __construct(
/**
* @param resource $stream
*/
public function read(ColumnChunk $columnChunk, Column $column, $stream, int $limit = null) : \Generator
public function read(ColumnChunk $columnChunk, FlatColumn $column, $stream, int $limit = null) : \Generator
{
$this->logger->debug('[Parquet File][Read Column][Read Column Chunk]', ['chunk' => $columnChunk->normalize()]);
$offset = $columnChunk->pageOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Flow\Parquet\ParquetFile\Page;

use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\LogicalType;
use Flow\Parquet\ParquetFile\Schema\PhysicalType;

Expand All @@ -24,7 +24,7 @@ public function __construct(
) {
}

public static function initialize(Column $column) : self
public static function initialize(FlatColumn $column) : self
{
return new self($column->type(), $column->logicalType(), [], [], []);
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
use Flow\Parquet\ParquetFile\Page\ColumnData;
use Flow\Parquet\ParquetFile\Page\Dictionary;
use Flow\Parquet\ParquetFile\Page\PageHeader;
use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

final class PageReader
{
public function __construct(
private readonly Column $column,
private readonly FlatColumn $column,
private readonly Options $options,
private readonly ByteOrder $byteOrder,
private readonly LoggerInterface $logger = new NullLogger()
Expand Down
97 changes: 42 additions & 55 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,40 @@ final class Schema
*/
private array $cache = [];

/**
* @param array<Column> $columns
*/
private function __construct(
private readonly FlatColumn $schemaRoot,
private readonly array $columns
public function __construct(
private readonly NestedColumn $schemaRoot,
) {
}

public static function from(Column ...$columns) : self
{
return new self(
FlatColumn::boolean('schema'),
$columns
);
}

/**
* @param array<SchemaElement> $schemaElements
*/
public static function fromThrift(array $schemaElements) : self
{
if (!\count($schemaElements)) {
throw new \InvalidArgumentException('Schema must have at least one element');
throw new InvalidArgumentException('Schema must have at least one element');
}

$schema = self::processSchema($schemaElements);

if (\count($schema) !== 1) {
throw new InvalidArgumentException('Schema must have exactly one root element');
}

if (!$schema[0] instanceof NestedColumn) {
throw new InvalidArgumentException('Schema must be a NestedColumn');
}

return new self($schema[0]);
}

public static function with(Column ...$columns) : self
{
return new self(
FlatColumn::fromThrift(\array_shift($schemaElements)),
self::processSchema($schemaElements)
NestedColumn::struct(
'schema',
$columns,
)
);
}

Expand All @@ -52,7 +57,7 @@ public static function fromThrift(array $schemaElements) : self
*/
public function columns() : array
{
return $this->columns;
return $this->schemaRoot->children();
}

public function get(string $flatPath) : Column
Expand Down Expand Up @@ -90,7 +95,7 @@ public function get(string $flatPath) : Column
return null;
};

$column = $getByFlatPath($flatPath, $this->columns);
$column = $getByFlatPath($flatPath, $this->schemaRoot->children());

if ($column instanceof Column) {
$this->cache[$flatPath] = $column;
Expand All @@ -116,7 +121,7 @@ public function toDDL() : array
{
return [$this->schemaRoot->name() => [
'type' => 'message',
'children' => $this->generateDDL($this->columns),
'children' => $this->generateDDL($this->schemaRoot->children()),
]];
}

Expand All @@ -139,44 +144,26 @@ private function generateDDL(array $columns) : array
*
* @return array<Column>
*/
private static function processSchema(
array $schemaElements,
int &$index = 0,
?string $rootPath = null,
int $childrenCount = null,
Column $parent = null
) : array {
$columns = [];

$processedChildren = 0;

while ($index < \count($schemaElements) && ($childrenCount === null || $processedChildren < $childrenCount)) {
$elem = $schemaElements[$index];
$index++;

$root = FlatColumn::fromThrift($elem, $rootPath, $parent);

if ($elem->num_children > 0) {
$nestedColumn = new NestedColumn($root, [], $parent);
$children = self::processSchema(
$schemaElements,
$index,
$root->flatPath(),
$elem->num_children,
$nestedColumn
);

// now update the children of the NestedColumn
$nestedColumn->setChildren($children);

$columns[] = $nestedColumn; // use the updated NestedColumn
} else {
$columns[] = $root;
private static function processSchema(array $schemaElements, int &$index = 0) : array
{
$element = $schemaElements[$index];
$index++;

if ($element->num_children) {
$children = [];

for ($i = 0; $i < $element->num_children; $i++) {
$children = \array_merge($children, self::processSchema($schemaElements, $index));
}

$processedChildren++;
return [
NestedColumn::fromThrift(
$element,
$children,
),
];
}

return $columns;
return [FlatColumn::fromThrift($element)];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function repetition() : ?Repetition;

public function setParent(NestedColumn $parent) : void;

public function type() : PhysicalType;
public function type() : ?PhysicalType;

public function typeLength() : ?int;
}
59 changes: 43 additions & 16 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
final class FlatColumn implements Column
{
private ?NestedColumn $parent = null;

public function __construct(
private readonly string $name,
private readonly PhysicalType $type,
Expand All @@ -20,8 +22,6 @@ public function __construct(
private readonly ?int $precision = null,
private readonly ?int $scale = null,
private readonly ?int $typeLength = null,
private ?string $rootPath = null,
private ?Column $parent = null
) {
}

Expand Down Expand Up @@ -61,26 +61,26 @@ public static function double(string $name) : self
return new self($name, PhysicalType::DOUBLE, null, Repetition::OPTIONAL);
}

public static function enum(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function float(string $name) : self
{
return new self($name, PhysicalType::FLOAT, null, Repetition::OPTIONAL);
}

public static function fromThrift(
SchemaElement $thrift,
?string $rootPath = null,
?Column $parent = null
) : self {
public static function fromThrift(SchemaElement $thrift) : self
{
return new self(
$thrift->name,
PhysicalType::from((int) $thrift->type),
PhysicalType::from($thrift->type),
$thrift->logicalType === null ? null : LogicalType::fromThrift($thrift->logicalType),
$thrift->repetition_type === null ? null : Repetition::from($thrift->repetition_type),
$thrift->precision,
$thrift->scale,
$thrift->type_length,
$rootPath,
$parent
);
}

Expand All @@ -94,22 +94,35 @@ public static function int64(string $name) : self
return new self($name, PhysicalType::INT64, null, Repetition::OPTIONAL);
}

public static function json(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function string(string $name) : self
{
return new self($name, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public static function time(string $name) : self
{
return new self($name, PhysicalType::INT32, new LogicalType(LogicalType::TIME), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT64, new LogicalType(LogicalType::TIME), Repetition::OPTIONAL);
}

public static function uuid(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
}

public function __debugInfo() : ?array
{
return $this->normalize();
}

/** @psalm-suppress PossiblyNullOperand */
/**
* @psalm-suppress PossiblyNullOperand
* @psalm-suppress PossiblyNullPropertyFetch
*/
public function ddl() : array
{
return [
Expand All @@ -121,11 +134,18 @@ public function ddl() : array

public function flatPath() : string
{
if ($this->rootPath !== null) {
return $this->rootPath . '.' . $this->name;
$path = [$this->name];
$parent = $this->parent();

while ($parent) {
$path[] = $parent->name();
$parent = $parent->parent();
}

return $this->name;
$path = \array_reverse($path);
\array_shift($path);

return \implode('.', $path);
}

public function isList() : bool
Expand Down Expand Up @@ -211,6 +231,11 @@ public function logicalType() : ?LogicalType
return $this->logicalType;
}

public function makeRequired() : self
{
return new self($this->name, $this->type, $this->logicalType, Repetition::REQUIRED, $this->precision, $this->scale, $this->typeLength);
}

public function maxDefinitionsLevel() : int
{
$level = $this->repetition === Repetition::REQUIRED ? 0 : 1;
Expand All @@ -230,6 +255,9 @@ public function name() : string
return $this->name;
}

/**
* @psalm-suppress PossiblyNullPropertyFetch
*/
public function normalize() : array
{
return [
Expand Down Expand Up @@ -275,7 +303,6 @@ public function scale() : ?int

public function setParent(NestedColumn $parent) : void
{
$this->rootPath = $parent->flatPath();
$this->parent = $parent;
}

Expand Down
Loading

0 comments on commit fb9c2ee

Please sign in to comment.