Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrame - batch size #720

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Which does exactly the same thing as BufferLoader did, but in a more generic way
Pipeline Closure was reduced to be only Loader Closure and it was moved to \Flow\ETL\Loader namespace.
Additionally, \Closure::close method no longer requires Rows to be passed as an argument.

### 5) Parallelize

DataFrame::parallelize() method is deprecated, and it will be removed, instead use DataFrame::batchSize(int $size) method.

---

## Upgrading from 0.3.x to 0.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
}, \range(1, 100))
)
))
->parallelize(10)
->batchSize(10)
->write(Avro::to($path))
->run();

Expand Down
32 changes: 26 additions & 6 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Flow\ETL\Join\Join;
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\CachingPipeline;
use Flow\ETL\Pipeline\CollectingPipeline;
use Flow\ETL\Pipeline\GroupByPipeline;
Expand Down Expand Up @@ -68,6 +69,25 @@ public function aggregate(Aggregation ...$aggregations) : self
return $this;
}

/**
* Merge/Split Rows yielded by Extractor into batches of given size.
* For example, when Extractor is yielding one row at time, this method will merge them into batches of given size
* before passing them to the next pipeline element.
* Similarly when Extractor is yielding batches of rows, this method will split them into smaller batches of given size.
*
* In order to merge all Rows into a single batch use DataFrame::collect() method.
*
* @param int<1, max> $size
*
* @lazy
*/
public function batchSize(int $size) : self
{
$this->pipeline = new BatchingPipeline($this->pipeline, $size);

return $this;
}

/**
* Start processing rows up to this moment and put each instance of Rows
* into previously defined cache.
Expand All @@ -86,16 +106,14 @@ public function cache(?string $id = null) : self
}

/**
* Before transforming rows, collect them into batches of given size.
* When batch size is not specific, all rows are going to be first collected into memory and then processed.
*
* @param null|int<1, max> $batchSize
* Before transforming rows, collect them and merge into single Rows instance.
* This might lead to memory issues when processing large amount of rows, use with caution.
*
* @lazy
*/
public function collect(?int $batchSize = null) : self
public function collect() : self
{
$this->pipeline = new CollectingPipeline($this->pipeline, $batchSize);
$this->pipeline = new CollectingPipeline($this->pipeline);

return $this;
}
Expand Down Expand Up @@ -430,6 +448,8 @@ public function onError(ErrorHandler $handler) : self
}

/**
* @deprecated - use DataFrame::batchSize() instead
*
* Keep extracting rows and passing them through all transformers up to this point.
* From here each transformed Row is divided and pushed forward to following pipeline elements.
*
Expand Down
82 changes: 82 additions & 0 deletions src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Pipeline;

use Flow\ETL\DSL\From;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Pipeline;
use Flow\ETL\Transformer;

final class BatchingPipeline implements Pipeline
{
private readonly Pipeline $nextPipeline;

/**
* @param Pipeline $pipeline
* @param int<1, max> $size
*
* @throws InvalidArgumentException
*/
public function __construct(private readonly Pipeline $pipeline, private readonly int $size)
{
$this->nextPipeline = $pipeline->cleanCopy();

/**
* @psalm-suppress DocblockTypeContradiction
*
* @phpstan-ignore-next-line
*/
if ($this->size <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size);
}
}

public function add(Loader|Transformer $pipe) : self
{
$this->nextPipeline->add($pipe);

return $this;
}

public function cleanCopy() : Pipeline
{
return $this->pipeline->cleanCopy();
}

public function closure(FlowContext $context) : void
{
$this->pipeline->closure($context);
}

public function has(string $transformerClass) : bool
{
return $this->pipeline->has($transformerClass);
}

public function isAsync() : bool
{
return $this->pipeline->isAsync();
}

public function process(FlowContext $context) : \Generator
{
$this->nextPipeline->source(
From::chunks_from(
From::pipeline($this->pipeline),
$this->size
)
);

return $this->nextPipeline->process($context);
}

public function source(Extractor $extractor) : self
{
$this->pipeline->source($extractor);

return $this;
}
}
29 changes: 4 additions & 25 deletions src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,12 @@ final class CollectingPipeline implements Pipeline

/**
* @param Pipeline $pipeline
* @param null|int<1, max> $batchSize
*
* @throws InvalidArgumentException
*/
public function __construct(private readonly Pipeline $pipeline, private readonly ?int $batchSize = null)
public function __construct(private readonly Pipeline $pipeline)
{
$this->nextPipeline = $pipeline->cleanCopy();

if ($this->batchSize !== null) {
/**
* @psalm-suppress DocblockTypeContradiction
*
* @phpstan-ignore-next-line
*/
if ($this->batchSize <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->batchSize);
}
}
}

public function add(Loader|Transformer $pipe) : self
Expand Down Expand Up @@ -71,18 +59,9 @@ public function isAsync() : bool

public function process(FlowContext $context) : \Generator
{
if ($this->batchSize === null) {
$this->nextPipeline->source(From::rows(
(new Rows())->merge(...\iterator_to_array($this->pipeline->process($context)))
));
} else {
$this->nextPipeline->source(
From::chunks_from(
From::pipeline($this->pipeline),
$this->batchSize
)
);
}
$this->nextPipeline->source(From::rows(
(new Rows())->merge(...\iterator_to_array($this->pipeline->process($context)))
));

return $this->nextPipeline->process($context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Pipeline;

use Flow\ETL\Config;
use Flow\ETL\DSL\From;
use Flow\ETL\FlowContext;
use Flow\ETL\Pipeline\BatchingPipeline;
use Flow\ETL\Pipeline\SynchronousPipeline;
use Flow\ETL\Rows;
use PHPUnit\Framework\TestCase;

final class BatchingPipelineTest extends TestCase
{
public function test_batching_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 10);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
]),
From::array([
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
1,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}

public function test_that_rows_are_not_lost() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), $batchSize = 7);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertEquals(
[
[
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
],
[
['id' => 8],
['id' => 9],
['id' => 10],
],
],
\array_map(
static fn (Rows $r) => $r->toArray(),
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
)
);
}

public function test_using_bigger_batch_size_than_total_number_of_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 11);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
]),
From::array([
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
1,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}

public function test_using_smaller_batch_size_than_total_number_of_rows() : void
{
$pipeline = new BatchingPipeline(new SynchronousPipeline(), size: 5);
$pipeline->source(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
));

$this->assertCount(
2,
\iterator_to_array($pipeline->process(new FlowContext(Config::default())))
);
}
}
Loading