Skip to content

Commit

Permalink
Automate normalization of rows in JsonLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Aug 9, 2024
1 parent a7c3654 commit d87474d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Loader\Closure;
use Flow\ETL\{FlowContext, Loader, Rows};
use Flow\ETL\{Adapter\JSON\RowsNormalizer\EntryNormalizer, FlowContext, Loader, Rows};
use Flow\Filesystem\{DestinationStream, Partition, Path};

final class JsonLoader implements Closure, Loader, Loader\FileLoader
Expand All @@ -16,8 +16,10 @@ final class JsonLoader implements Closure, Loader, Loader\FileLoader
*/
private array $writes = [];

public function __construct(private readonly Path $path)
{
public function __construct(
private readonly Path $path,
private readonly string $dateTimeFormat = \DateTimeInterface::ATOM
) {
if ($this->path->isPattern()) {
throw new \InvalidArgumentException("JsonLoader path can't be pattern, given: " . $this->path->path());
}
Expand Down Expand Up @@ -54,6 +56,7 @@ public function load(Rows $rows, FlowContext $context) : void
public function write(Rows $nextRows, array $partitions, FlowContext $context) : void
{
$streams = $context->streams();
$normalizer = new RowsNormalizer(new EntryNormalizer($context->config->caster(), $this->dateTimeFormat));

if (!$streams->isOpen($this->path, $partitions)) {
$stream = $streams->writeTo($this->path, $partitions);
Expand All @@ -67,7 +70,7 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) :
$stream = $streams->writeTo($this->path, $partitions);
}

$this->writeJSON($nextRows, $stream);
$this->writeJSON($nextRows, $stream, $normalizer);
}

/**
Expand All @@ -77,16 +80,18 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) :
* @throws RuntimeException
* @throws \JsonException
*/
public function writeJSON(Rows $rows, DestinationStream $stream) : void
public function writeJSON(Rows $rows, DestinationStream $stream, RowsNormalizer $normalizer) : void
{
if (!\count($rows)) {
return;
}

$json = \substr(\substr(\json_encode($rows->toArray(), JSON_THROW_ON_ERROR), 0, -1), 1);
$json = ($this->writes[$stream->path()->path()] > 0) ? ',' . $json : $json;
foreach ($normalizer->normalize($rows) as $normalizedRow) {
$json = json_encode($normalizedRow, JSON_THROW_ON_ERROR);
$json = ($this->writes[$stream->path()->path()] > 0) ? ',' . $json : $json;

$stream->append($json);
$stream->append($json);
}

$this->writes[$stream->path()->path()]++;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\JSON;

use Flow\ETL\Adapter\JSON\RowsNormalizer\EntryNormalizer;
use Flow\ETL\Rows;

final class RowsNormalizer
{
public function __construct(private readonly EntryNormalizer $normalizer)
{

}

/**
* @return \Generator<array<string, null|array|bool|float|int|string>>
*/
public function normalize(Rows $rows) : \Generator
{
foreach ($rows as $row) {
$normalizedRow = [];

foreach ($row->entries() as $entry) {
$normalizedRow[$entry->name()] = $this->normalizer->normalize($entry);
}

yield $normalizedRow;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\JSON\RowsNormalizer;

use function Flow\ETL\DSL\type_array;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row\Entry;

final class EntryNormalizer
{
public function __construct(
private readonly Caster $caster,
private readonly string $dateTimeFormat = \DateTimeInterface::ATOM
) {
}

public function normalize(Entry $entry) : string|float|int|bool|array|null
{
return match ($entry::class) {
Entry\UuidEntry::class => $entry->toString(),
Entry\DateTimeEntry::class => $entry->value()?->format($this->dateTimeFormat),
Entry\EnumEntry::class => $entry->value()?->name,
Entry\ArrayEntry::class,
Entry\ListEntry::class,
Entry\MapEntry::class,
Entry\StructureEntry::class,
Entry\JsonEntry::class,
Entry\ObjectEntry::class => $this->caster->to(type_array())->value($entry->value()),
default => $entry->value(),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ function from_json(
*
* @return Loader
*/
function to_json(string|Path $path) : Loader
function to_json(string|Path $path, string $date_time_format = \DateTimeInterface::ATOM) : Loader
{
return new JsonLoader(
\is_string($path) ? Path::realpath($path) : $path,
);
return new JsonLoader(\is_string($path) ? Path::realpath($path) : $path, $date_time_format);
}

0 comments on commit d87474d

Please sign in to comment.