Skip to content

Commit

Permalink
DataFrame - batch size (#720)
Browse files Browse the repository at this point in the history
* Added batchSize method to DataFrame

* Updated UPGRADE.md
  • Loading branch information
norberttech committed Nov 3, 2023
1 parent 247bd2c commit 85e3db4
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 114 deletions.
4 changes: 4 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Which does exactly the same thing as BufferLoader did, but in a more generic way
Pipeline Closure was reduced to be only Loader Closure and it was moved to \Flow\ETL\Loader namespace.
Additionally, \Closure::close method no longer requires Rows to be passed as an argument.

### 5) Parallelize

DataFrame::parallelize() method is deprecated, and it will be removed, instead use DataFrame::batchSize(int $size) method.

---

## Upgrading from 0.3.x to 0.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
}, \range(1, 100))
)
))
->parallelize(10)
->batchSize(10)
->write(Avro::to($path))
->run();

Expand Down
32 changes: 26 additions & 6 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Flow\ETL\Join\Join;
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\CachingPipeline;
use Flow\ETL\Pipeline\CollectingPipeline;
use Flow\ETL\Pipeline\GroupByPipeline;
Expand Down Expand Up @@ -68,6 +69,25 @@ public function aggregate(Aggregation ...$aggregations) : self
return $this;
}

/**
* Merge/Split Rows yielded by Extractor into batches of given size.
* For example, when Extractor is yielding one row at time, this method will merge them into batches of given size
* before passing them to the next pipeline element.
* Similarly when Extractor is yielding batches of rows, this method will split them into smaller batches of given size.
*
* In order to merge all Rows into a single batch use DataFrame::collect() method.
*
* @param int<1, max> $size
*
* @lazy
*/
public function batchSize(int $size) : self
{
$this->pipeline = new BatchingPipeline($this->pipeline, $size);

return $this;
}

/**
* Start processing rows up to this moment and put each instance of Rows
* into previously defined cache.
Expand All @@ -86,16 +106,14 @@ public function cache(?string $id = null) : self
}

/**
* Before transforming rows, collect them into batches of given size.
* When batch size is not specific, all rows are going to be first collected into memory and then processed.
*
* @param null|int<1, max> $batchSize
* Before transforming rows, collect them and merge into single Rows instance.
* This might lead to memory issues when processing large amount of rows, use with caution.
*
* @lazy
*/
public function collect(?int $batchSize = null) : self
public function collect() : self
{
$this->pipeline = new CollectingPipeline($this->pipeline, $batchSize);
$this->pipeline = new CollectingPipeline($this->pipeline);

return $this;
}
Expand Down Expand Up @@ -430,6 +448,8 @@ public function onError(ErrorHandler $handler) : self
}

/**
* @deprecated - use DataFrame::batchSize() instead
*
* Keep extracting rows and passing them through all transformers up to this point.
* From here each transformed Row is divided and pushed forward to following pipeline elements.
*
Expand Down
82 changes: 82 additions & 0 deletions src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Pipeline;

use Flow\ETL\DSL\From;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Pipeline;
use Flow\ETL\Transformer;

final class BatchingPipeline implements Pipeline
{
private readonly Pipeline $nextPipeline;

/**
* @param Pipeline $pipeline
* @param int<1, max> $size
*
* @throws InvalidArgumentException
*/
public function __construct(private readonly Pipeline $pipeline, private readonly int $size)
{
$this->nextPipeline = $pipeline->cleanCopy();

/**
* @psalm-suppress DocblockTypeContradiction
*
* @phpstan-ignore-next-line
*/
if ($this->size <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size);
}
}

public function add(Loader|Transformer $pipe) : self
{
$this->nextPipeline->add($pipe);

return $this;
}

public function cleanCopy() : Pipeline
{
return $this->pipeline->cleanCopy();
}

public function closure(FlowContext $context) : void
{
$this->pipeline->closure($context);
}

public function has(string $transformerClass) : bool
{
return $this->pipeline->has($transformerClass);
}

public function isAsync() : bool
{
return $this->pipeline->isAsync();
}

public function process(FlowContext $context) : \Generator
{
$this->nextPipeline->source(
From::chunks_from(
From::pipeline($this->pipeline),
$this->size
)
);

return $this->nextPipeline->process($context);
}

public function source(Extractor $extractor) : self
{
$this->pipeline->source($extractor);

return $this;
}
}
29 changes: 4 additions & 25 deletions src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,12 @@ final class CollectingPipeline implements Pipeline

/**
* @param Pipeline $pipeline
* @param null|int<1, max> $batchSize
*
* @throws InvalidArgumentException
*/
public function __construct(private readonly Pipeline $pipeline, private readonly ?int $batchSize = null)
public function __construct(private readonly Pipeline $pipeline)
{
$this->nextPipeline = $pipeline->cleanCopy();

if ($this->batchSize !== null) {
/**
* @psalm-suppress DocblockTypeContradiction
*
* @phpstan-ignore-next-line
*/
if ($this->batchSize <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->batchSize);
}
}
}

public function add(Loader|Transformer $pipe) : self
Expand Down Expand Up @@ -71,18 +59,9 @@ public function isAsync() : bool

public function process(FlowContext $context) : \Generator
{
if ($this->batchSize === null) {
$this->nextPipeline->source(From::rows(
(new Rows())->merge(...\iterator_to_array($this->pipeline->process($context)))
));
} else {
$this->nextPipeline->source(
From::chunks_from(
From::pipeline($this->pipeline),
$this->batchSize
)
);
}
$this->nextPipeline->source(From::rows(
(new Rows())->merge(...\iterator_to_array($this->pipeline->process($context)))
));

return $this->nextPipeline->process($context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Pipeline;

use Flow\ETL\Config;
use Flow\ETL\DSL\From;
use Flow\ETL\FlowContext;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\SynchronousPipeline;
use Flow\ETL\Rows;
use PHPUnit\Framework\TestCase;

final class BatchingPipelineTest extends TestCase
{
public function test_batching_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 10);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
]),
From::array([
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
1,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}

public function test_that_rows_are_not_lost() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), $batchSize = 7);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertEquals(
[
[
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
],
[
['id' => 8],
['id' => 9],
['id' => 10],
],
],
\array_map(
static fn (Rows $r) => $r->toArray(),
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
)
);
}

public function test_using_bigger_batch_size_than_total_number_of_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 11);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
]),
From::array([
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
1,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}

public function test_using_smaller_batch_size_than_total_number_of_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 5);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
2,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}
}
Loading

0 comments on commit 85e3db4

Please sign in to comment.