Skip to content

Commit

Permalink
Implement new ListType logical type (#749)
Browse files Browse the repository at this point in the history
* Implement new `ListType` logical type

* Improve implementation after review
  • Loading branch information
stloyd committed Nov 7, 2023
1 parent 1d61efe commit 7558da4
Show file tree
Hide file tree
Showing 21 changed files with 230 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
use Flow\ETL\PHP\Type\ObjectType;
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\Row;
use Flow\ETL\Row\Entry\DateTimeEntry;
use Flow\ETL\Row\Schema;
Expand Down Expand Up @@ -98,10 +99,11 @@ public function load(Rows $rows, FlowContext $context) : void

private function listEntryToValues(Row\Entry\ListEntry $entry) : array
{
/** @var ListElement $listType */
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);

if ($listType instanceof ObjectType) {
if (\is_a($listType->class, Row\Entry\Type\Uuid::class, true)) {
if ($listType->value() instanceof ObjectType) {
if (\is_a($listType->value()->class, Row\Entry\Type\Uuid::class, true)) {
/** @var array<string> $data */
$data = [];

Expand All @@ -112,7 +114,7 @@ private function listEntryToValues(Row\Entry\ListEntry $entry) : array
return $data;
}

if (\is_a($listType->class, \DateTimeInterface::class, true)) {
if (\is_a($listType->value()->class, \DateTimeInterface::class, true)) {
/** @var array<int> $data */
$data = [];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
namespace Flow\ETL\Adapter\Avro\FlixTech;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\ObjectType;
use Flow\ETL\PHP\Type\ScalarType;
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\ArrayEntry;
use Flow\ETL\Row\Entry\BooleanEntry;
Expand Down Expand Up @@ -55,23 +56,24 @@ private function convert(Definition $definition) : array
$type = $this->typeFromDefinition($definition);

if ($type === ListEntry::class) {
/** @var ListElement $listType */
$listType = $definition->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);

if ($listType instanceof ScalarType) {
return match ($listType) {
if ($listType->value() instanceof ScalarType) {
return match ($listType->value()) {
ScalarType::string => ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => \AvroSchema::STRING_TYPE]],
ScalarType::integer => ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => \AvroSchema::INT_TYPE]],
ScalarType::float => ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => \AvroSchema::FLOAT_TYPE]],
ScalarType::boolean => ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => \AvroSchema::BOOLEAN_TYPE]],
};
}

if ($listType instanceof ObjectType) {
if (\is_a($listType->class, \DateTimeInterface::class, true)) {
if ($listType->value() instanceof ObjectType) {
if (\is_a($listType->value()->class, \DateTimeInterface::class, true)) {
return ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros']];
}

throw new RuntimeException("List of {$listType->class} is not supported yet supported.");
throw new RuntimeException("List of {$listType->toString()} is not supported yet supported.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
namespace Flow\ETL\Adapter\Parquet;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\ObjectType;
use Flow\ETL\PHP\Type\ScalarType;
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\ArrayEntry;
use Flow\ETL\Row\Entry\BooleanEntry;
Expand All @@ -25,7 +26,7 @@
use Flow\Parquet\ParquetFile\Schema as ParquetSchema;
use Flow\Parquet\ParquetFile\Schema\Column;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\ListElement;
use Flow\Parquet\ParquetFile\Schema\ListElement as ParquetListElement;
use Flow\Parquet\ParquetFile\Schema\NestedColumn;

final class SchemaConverter
Expand Down Expand Up @@ -64,26 +65,27 @@ private function convertEntry(Definition $definition) : Column

private function listEntryToParquet(Definition $definition) : NestedColumn
{
/** @var ListElement $listType */
$listType = $definition->metadata()->get(FlowMetadata::METADATA_LIST_ENTRY_TYPE);

if ($listType instanceof ScalarType) {
if ($listType->value() instanceof ScalarType) {
return NestedColumn::list(
$definition->entry()->name(),
match ($listType) {
ScalarType::string => ListElement::string(),
ScalarType::integer => ListElement::int64(),
ScalarType::float => ListElement::float(),
ScalarType::boolean => ListElement::boolean()
match ($listType->value()) {
ScalarType::string => ParquetListElement::string(),
ScalarType::integer => ParquetListElement::int64(),
ScalarType::float => ParquetListElement::float(),
ScalarType::boolean => ParquetListElement::boolean()
}
);
}

if ($listType instanceof ObjectType) {
if (\is_a($listType->class, \DateTimeInterface::class, true)) {
return NestedColumn::list($definition->entry()->name(), ListElement::datetime());
if ($listType->value() instanceof ObjectType) {
if (\is_a($listType->value()->class, \DateTimeInterface::class, true)) {
return NestedColumn::list($definition->entry()->name(), ParquetListElement::datetime());
}

throw new RuntimeException("List of {$listType->class} is not supported yet supported.");
throw new RuntimeException("List of {$listType->toString()} is not supported yet supported.");
}

/** @phpstan-ignore-next-line */
Expand Down
15 changes: 7 additions & 8 deletions src/core/etl/src/Flow/ETL/DSL/Entry.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
namespace Flow\ETL\DSL;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\PHP\Type\ObjectType;
use Flow\ETL\PHP\Type\ScalarType;
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\Row\Entries;
use Flow\ETL\Row\Entry as RowEntry;
use Flow\ETL\Row\Entry\Type\Uuid;
Expand Down Expand Up @@ -158,7 +157,7 @@ final public static function json_string(string $name, string $data) : RowEntry
*/
final public static function list_of_boolean(string $name, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, ScalarType::boolean, $value);
return new RowEntry\ListEntry($name, ListElement::boolean(), $value);
}

/**
Expand All @@ -170,7 +169,7 @@ final public static function list_of_boolean(string $name, array $value) : RowEn
*/
final public static function list_of_datetime(string $name, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, new ObjectType(\DateTimeInterface::class), $value);
return new RowEntry\ListEntry($name, ListElement::object(\DateTimeInterface::class), $value);
}

/**
Expand All @@ -182,7 +181,7 @@ final public static function list_of_datetime(string $name, array $value) : RowE
*/
final public static function list_of_float(string $name, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, ScalarType::float, $value);
return new RowEntry\ListEntry($name, ListElement::float(), $value);
}

/**
Expand All @@ -194,7 +193,7 @@ final public static function list_of_float(string $name, array $value) : RowEntr
*/
final public static function list_of_int(string $name, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, ScalarType::integer, $value);
return new RowEntry\ListEntry($name, ListElement::integer(), $value);
}

/**
Expand All @@ -207,7 +206,7 @@ final public static function list_of_int(string $name, array $value) : RowEntry
*/
final public static function list_of_objects(string $name, string $class, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, new ObjectType($class), $value);
return new RowEntry\ListEntry($name, ListElement::object($class), $value);
}

/**
Expand All @@ -219,7 +218,7 @@ final public static function list_of_objects(string $name, string $class, array
*/
final public static function list_of_string(string $name, array $value) : RowEntry
{
return new RowEntry\ListEntry($name, ScalarType::string, $value);
return new RowEntry\ListEntry($name, ListElement::string(), $value);
}

/**
Expand Down
66 changes: 66 additions & 0 deletions src/core/etl/src/Flow/ETL/PHP/Type/Logical/List/ListElement.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php declare(strict_types=1);

namespace Flow\ETL\PHP\Type\Logical\List;

use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;

final class ListElement
{
private function __construct(private readonly ScalarType|ObjectType $value)
{
}

public static function boolean() : self
{
return new self(ScalarType::boolean);
}

public static function float() : self
{
return new self(ScalarType::float);
}

public static function integer() : self
{
return new self(ScalarType::integer);
}

/**
* @param class-string $class
*/
public static function object(string $class) : self
{
return new self(new ObjectType($class));
}

public static function scalar(string $value) : self
{
return new self(ScalarType::fromString($value));
}

public static function string() : self
{
return new self(ScalarType::string);
}

public function isEqual(mixed $value) : bool
{
return $this->value->isEqual($value);
}

public function isValid(mixed $value) : bool
{
return $this->value->isValid($value);
}

public function toString() : string
{
return $this->value->toString();
}

public function value() : ScalarType|ObjectType
{
return $this->value;
}
}
65 changes: 65 additions & 0 deletions src/core/etl/src/Flow/ETL/PHP/Type/Logical/ListType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php declare(strict_types=1);

namespace Flow\ETL\PHP\Type\Logical;

use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Type;
use Flow\Serializer\Serializable;

/**
* @implements Serializable<array{element: ListElement}>
*/
final class ListType implements Serializable, Type
{
public function __construct(private readonly List\ListElement $element)
{
}

public function __serialize() : array
{
return ['element' => $this->element];
}

public function __unserialize(array $data) : void
{
$this->element = $data['element'];
}

public function element() : ListElement
{
return $this->element;
}

public function isEqual(Type $type) : bool
{
if (!$type instanceof self) {
return false;
}

return $this->element->toString() === $type->element()->toString();
}

public function isValid(mixed $value) : bool
{
if (!\is_array($value)) {
return false;
}

if ([] !== $value && !\array_is_list($value)) {
return false;
}

foreach ($value as $item) {
if (!$this->element->isValid($item)) {
return false;
}
}

return true;
}

public function toString() : string
{
return $this->element->toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

declare(strict_types=1);

namespace Flow\ETL\PHP\Type;
namespace Flow\ETL\PHP\Type\Native;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\PHP\Type\Type;

final class ObjectType implements Type
{
Expand All @@ -13,21 +14,11 @@ final class ObjectType implements Type
*/
public function __construct(public readonly string $class)
{
if (!\class_exists($class) && !\interface_exists($this->class)) {
if (!\class_exists($class) && !\interface_exists($class)) {
throw new InvalidArgumentException("Class {$class} not found");
}
}

/**
* @psalm-suppress MoreSpecificImplementedParamType
*
* @param class-string $value
*/
public static function fromString(string $value) : self
{
return new self($value);
}

/**
* @param class-string $class
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

declare(strict_types=1);

namespace Flow\ETL\PHP\Type;
namespace Flow\ETL\PHP\Type\Native;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\PHP\Type\Type;

enum ScalarType : string implements Type
{
Expand Down
2 changes: 0 additions & 2 deletions src/core/etl/src/Flow/ETL/PHP/Type/Type.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

interface Type
{
public static function fromString(string $value) : self;

public function isEqual(self $type) : bool;

public function isValid(mixed $value) : bool;
Expand Down
Loading

0 comments on commit 7558da4

Please sign in to comment.