diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php index bf7cbfc04..57cbf3716 100644 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php @@ -6,7 +6,7 @@ use Flow\ETL\Exception\RuntimeException; use Flow\ETL\Loader\Closure; -use Flow\ETL\{FlowContext, Loader, Rows}; +use Flow\ETL\{Adapter\JSON\RowsNormalizer\EntryNormalizer, FlowContext, Loader, Rows}; use Flow\Filesystem\{DestinationStream, Partition, Path}; final class JsonLoader implements Closure, Loader, Loader\FileLoader @@ -16,8 +16,10 @@ final class JsonLoader implements Closure, Loader, Loader\FileLoader */ private array $writes = []; - public function __construct(private readonly Path $path) - { + public function __construct( + private readonly Path $path, + private readonly string $dateTimeFormat = \DateTimeInterface::ATOM + ) { if ($this->path->isPattern()) { throw new \InvalidArgumentException("JsonLoader path can't be pattern, given: " . $this->path->path()); } @@ -54,6 +56,7 @@ public function load(Rows $rows, FlowContext $context) : void public function write(Rows $nextRows, array $partitions, FlowContext $context) : void { $streams = $context->streams(); + $normalizer = new RowsNormalizer(new EntryNormalizer($context->config->caster(), $this->dateTimeFormat)); if (!$streams->isOpen($this->path, $partitions)) { $stream = $streams->writeTo($this->path, $partitions); @@ -67,7 +70,7 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) : $stream = $streams->writeTo($this->path, $partitions); } - $this->writeJSON($nextRows, $stream); + $this->writeJSON($nextRows, $stream, $normalizer); } /** @@ -77,16 +80,18 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) : * @throws RuntimeException * @throws \JsonException */ - public function writeJSON(Rows $rows, DestinationStream $stream) : void + public function writeJSON(Rows $rows, DestinationStream $stream, RowsNormalizer $normalizer) : void { if (!\count($rows)) { return; } - $json = \substr(\substr(\json_encode($rows->toArray(), JSON_THROW_ON_ERROR), 0, -1), 1); - $json = ($this->writes[$stream->path()->path()] > 0) ? ',' . $json : $json; + foreach ($normalizer->normalize($rows) as $normalizedRow) { + $json = json_encode($normalizedRow, JSON_THROW_ON_ERROR); + $json = ($this->writes[$stream->path()->path()] > 0) ? ',' . $json : $json; - $stream->append($json); + $stream->append($json); + } $this->writes[$stream->path()->path()]++; } diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer.php new file mode 100644 index 000000000..d8a398f4a --- /dev/null +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer.php @@ -0,0 +1,32 @@ +> + */ + public function normalize(Rows $rows) : \Generator + { + foreach ($rows as $row) { + $normalizedRow = []; + + foreach ($row->entries() as $entry) { + $normalizedRow[$entry->name()] = $this->normalizer->normalize($entry); + } + + yield $normalizedRow; + } + } +} diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer/EntryNormalizer.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer/EntryNormalizer.php new file mode 100644 index 000000000..b99d65f97 --- /dev/null +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/RowsNormalizer/EntryNormalizer.php @@ -0,0 +1,34 @@ + $entry->toString(), + Entry\DateTimeEntry::class => $entry->value()?->format($this->dateTimeFormat), + Entry\EnumEntry::class => $entry->value()?->name, + Entry\ArrayEntry::class, + Entry\ListEntry::class, + Entry\MapEntry::class, + Entry\StructureEntry::class, + Entry\JsonEntry::class, + Entry\ObjectEntry::class => $this->caster->to(type_array())->value($entry->value()), + default => $entry->value(), + }; + } +} diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/functions.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/functions.php index 91f2c127a..d898f34c7 100644 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/functions.php +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/functions.php @@ -47,9 +47,7 @@ function from_json( * * @return Loader */ -function to_json(string|Path $path) : Loader +function to_json(string|Path $path, string $date_time_format = \DateTimeInterface::ATOM) : Loader { - return new JsonLoader( - \is_string($path) ? Path::realpath($path) : $path, - ); + return new JsonLoader(\is_string($path) ? Path::realpath($path) : $path, $date_time_format); }