Skip to content

Commit

Permalink
Added support for partitioning in parquet loader (#712)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Nov 2, 2023
1 parent d996814 commit 69b0c81
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Flow\ETL\Adapter\Parquet;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
Expand All @@ -11,6 +10,7 @@
use Flow\ETL\Row\Schema;
use Flow\ETL\Rows;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\Writer;

/**
Expand All @@ -24,11 +24,15 @@ final class ParquetLoader implements Closure, Loader, Loader\FileLoader

private ?Schema $inferredSchema = null;

private ?Writer $writer = null;
/**
* @var array<string, Writer>
*/
private array $writers = [];

public function __construct(
private readonly Path $path,
private readonly Options $options,
private readonly Compressions $compressions = Compressions::SNAPPY,
private readonly ?Schema $schema = null,
) {
$this->converter = new SchemaConverter();
Expand All @@ -52,9 +56,14 @@ public function __unserialize(array $data) : void

public function closure(Rows $rows, FlowContext $context) : void
{
$this->writer($context)->close();
if (\count($this->writers)) {
foreach ($this->writers as $writer) {
$writer->close();
}
}

$context->streams()->close($this->path);
$this->writer = null;
$this->writers = [];
}

public function destination() : Path
Expand All @@ -64,16 +73,45 @@ public function destination() : Path

public function load(Rows $rows, FlowContext $context) : void
{
if (\count($context->partitionEntries())) {
throw new RuntimeException('Partitioning is not supported yet');
}

if ($this->schema === null) {
$this->inferSchema($rows);
}

foreach ($rows as $row) {
$this->writer($context)->writeRow($row->toArray());
$streams = $context->streams();

if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitions) {

$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe(), $partitions->partitions);

if (!\array_key_exists($stream->path()->uri(), $this->writers)) {
$this->writers[$stream->path()->uri()] = new Writer(
compression: $this->compressions,
options: $this->options
);

$this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema()));
}

foreach ($partitions->rows as $row) {
$this->writers[$stream->path()->uri()]->writeRow($row->toArray());
}
}
} else {
$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe());

if (!\array_key_exists($stream->path()->uri(), $this->writers)) {
$this->writers[$stream->path()->uri()] = new Writer(
compression: $this->compressions,
options: $this->options
);

$this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema()));
}

foreach ($rows as $row) {
$this->writers[$stream->path()->uri()]->writeRow($row->toArray());
}
}
}

Expand All @@ -95,26 +133,4 @@ private function schema() : Schema
/** @phpstan-ignore-next-line */
return $this->schema ?? $this->inferredSchema;
}

private function writer(FlowContext $context) : Writer
{
if ($this->writer !== null) {
return $this->writer;
}

$this->writer = new Writer(
options: $this->options
);
$this->writer->openForStream(
$context->streams()->open(
$this->path,
'parquet',
Mode::WRITE_BINARY,
$context->threadSafe()
)->resource(),
$this->converter->toParquet($this->schema())
);

return $this->writer;
}
}
3 changes: 3 additions & 0 deletions src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Flow\ETL\Row\Schema;
use Flow\Parquet\ByteOrder;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Compressions;

/**
* @infection-ignore-all
Expand Down Expand Up @@ -65,6 +66,7 @@ final public static function from(
final public static function to(
string|Path $path,
?Options $options = null,
Compressions $compressions = Compressions::SNAPPY,
?Schema $schema = null,
) : Loader {
if ($options === null) {
Expand All @@ -74,6 +76,7 @@ final public static function to(
return new ParquetLoader(
\is_string($path) ? Path::realpath($path) : $path,
$options,
$compressions,
$schema,
);
}
Expand Down
Loading

0 comments on commit 69b0c81

Please sign in to comment.