Skip to content

Commit

Permalink
Improved support for structures (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Oct 16, 2023
1 parent 37d509d commit 724b6aa
Show file tree
Hide file tree
Showing 26 changed files with 606 additions and 218 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
},
"require-dev": {
"aeon-php/calendar": "^1.0",
"fakerphp/faker": "^1.23",
"fig/log-test": "^1.1",
"jawira/case-converter": "^3.4",
"laravel/serializable-closure": "^1.1",
Expand Down
70 changes: 69 additions & 1 deletion composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public function closure(Rows $rows, FlowContext $context) : void
}

$context->streams()->close($this->path);
$this->writer = null;
}

public function destination() : Path
Expand Down Expand Up @@ -88,6 +89,7 @@ public function load(Rows $rows, FlowContext $context) : void
$rowData[$entry->name()] = match (\get_class($entry)) {
Row\Entry\ListEntry::class => $this->listEntryToValues($entry),
DateTimeEntry::class => (int) $entry->value()->format('Uu'),
Row\Entry\UuidEntry::class => $entry->value()->toString(),
Row\Entry\EnumEntry::class => $entry->value()->name,
default => $entry->value(),
};
Expand All @@ -102,6 +104,22 @@ private function listEntryToValues(Row\Entry\ListEntry $entry) : array
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);

if ($listType instanceof ObjectType) {
if (\is_a($listType->class, Row\Entry\Type\Uuid::class, true)) {
/** @var array<string> $data */
$data = [];

/**
* @psalm-suppress MixedAssignment
* @psalm-suppress MixedMethodCall
*/
foreach ($entry->value() as $value) {
/** @phpstan-ignore-next-line */
$data[] = $value->toString();
}

return $data;
}

if (\is_a($listType->class, \DateTimeInterface::class, true)) {
/** @var array<int> $data */
$data = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
use Flow\ETL\Row\Entry\ListEntry;
use Flow\ETL\Row\Entry\NullEntry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\Row\Entry\StructureEntry;
use Flow\ETL\Row\Entry\TypedCollection\ObjectType;
use Flow\ETL\Row\Entry\TypedCollection\ScalarType;
use Flow\ETL\Row\Entry\UuidEntry;
use Flow\ETL\Row\Schema;
use Flow\ETL\Row\Schema\Definition;
use Flow\ETL\Row\Schema\FlowMetadata;

final class SchemaConverter
{
Expand All @@ -34,20 +37,7 @@ public function toAvroJsonSchema(Schema $schema) : string
);
}

if (\count($definition->types()) === 2 && $definition->isNullable()) {
/** @var class-string<Entry> $type */
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
$fields[] = $this->convertType($type, $definition);
}

if (\count($definition->types()) === 1) {
$type = \current($definition->types());
$fields[] = $this->convertType($type, $definition);
}

if ((\count($definition->types()) === 2 && !$definition->isNullable()) || \count($definition->types()) > 2) {
throw new RuntimeException('Union types are not supported yet. Invalid type: ' . $definition->entry()->name());
}
$fields[] = $this->convert($definition);
}

return \json_encode([
Expand All @@ -63,8 +53,10 @@ public function toAvroJsonSchema(Schema $schema) : string
*
* @return array{name: string, type: string}
*/
private function convertType(string $type, Definition $definition) : array
private function convert(Definition $definition) : array
{
$type = $this->typeFromDefinition($definition);

if ($type === ListEntry::class) {
$listType = $definition->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);

Expand All @@ -86,8 +78,30 @@ private function convertType(string $type, Definition $definition) : array
}
}

if ($type === StructureEntry::class) {
/** @var array<string, Definition> $structureDefinitions */
$structureDefinitions = $definition->metadata()->get(FlowMetadata::METADATA_STRUCTURE_DEFINITIONS);

$structConverter = function (array $definitions) use (&$structConverter) : array {
$structureFields = [];

/** @var Definition $definition */
foreach ($definitions as $name => $definition) {
if (!\is_array($definition)) {
$structureFields[] = $this->convert($definition);
} else {
$structureFields[] = ['name' => $name, 'type' => ['name' => \ucfirst($name), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($definition)]];
}
}

return $structureFields;
};

return ['name' => $definition->entry()->name(), 'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($structureDefinitions)]];
}

return match ($type) {
StringEntry::class, JsonEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
StringEntry::class, JsonEntry::class, UuidEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
EnumEntry::class => [
'name' => $definition->entry()->name(),
'type' => [
Expand All @@ -107,4 +121,18 @@ private function convertType(string $type, Definition $definition) : array
default => throw new RuntimeException($type . ' is not yet supported.')
};
}

private function typeFromDefinition(Definition $definition) : string
{
if ($definition->isNullable() && \count($definition->types()) === 2) {
/** @var class-string<Entry> $type */
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
} elseif (\count($definition->types()) === 1) {
$type = \current($definition->types());
} else {
throw new RuntimeException('Union types are not supported by Avro file format. Invalid type: ' . $definition->entry()->name());
}

return $type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,19 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
Entry::json_object('json_object', ['id' => 1, 'name' => 'test']),
Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
Entry::list_of_string('list_of_strings', ['a', 'b', 'c']),
Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()])
Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]),
Entry::structure(
'address',
Entry::string('street', 'street_' . $i),
Entry::string('city', 'city_' . $i),
Entry::string('zip', 'zip_' . $i),
Entry::string('country', 'country_' . $i),
Entry::structure(
'location',
Entry::float('lat', 1.5),
Entry::float('lon', 1.5)
)
),
);
}, \range(1, 100))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

final class JsonExtractor implements Extractor
{
Expand Down Expand Up @@ -51,11 +52,13 @@ public function extract(FlowContext $context) : \Generator
}

/**
* @return array{pointer?: string}
* @return array{pointer?: string, decoder: ExtJsonDecoder}
*/
private function readerOptions() : array
{
$options = [];
$options = [
'decoder' => new ExtJsonDecoder(true),
];

if ($this->pointer !== null) {
$options['pointer'] = $this->pointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public function __unserialize(array $data) : void
public function closure(Rows $rows, FlowContext $context) : void
{
foreach ($context->streams() as $stream) {
$this->close($stream);
if ($stream->path()->extension() === 'json') {
$this->close($stream);
}
}

$context->streams()->close($this->path);
Expand Down
Loading

0 comments on commit 724b6aa

Please sign in to comment.