Skip to content

Commit

Permalink
Added DataFrame::until method (#808)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Nov 20, 2023
1 parent 9cdeb22 commit e26c8eb
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 4 deletions.
19 changes: 17 additions & 2 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Formatter\AsciiTableFormatter;
use Flow\ETL\Function\AggregatingFunction;
use Flow\ETL\Function\ScalarFunction;
use Flow\ETL\Function\WindowFunction;
use Flow\ETL\Join\Expression;
use Flow\ETL\Join\Join;
Expand All @@ -36,6 +37,7 @@
use Flow\ETL\Transformer\ScalarFunctionFilterTransformer;
use Flow\ETL\Transformer\ScalarFunctionTransformer;
use Flow\ETL\Transformer\StyleConverter\StringStyles;
use Flow\ETL\Transformer\UntilTransformer;
use Flow\ETL\Transformer\WindowFunctionTransformer;

final class DataFrame
Expand Down Expand Up @@ -248,9 +250,9 @@ public function fetch(?int $limit = null) : Rows
/**
* @lazy
*/
public function filter(Function\ScalarFunction $callback) : self
public function filter(ScalarFunction $function) : self
{
$this->pipeline->add(new ScalarFunctionFilterTransformer($callback));
$this->pipeline->add(new ScalarFunctionFilterTransformer($function));

return $this;
}
Expand Down Expand Up @@ -677,6 +679,19 @@ public function transform(Transformer|Transformation $transformer) : self
return $transformer->transform($this);
}

/**
* The difference between filter and until is that filter will keep filtering rows until extractors finish yielding rows.
* Until will send a STOP signal to the Extractor when the condition is not met.
*
* @lazy
*/
public function until(ScalarFunction $function) : self
{
$this->pipeline->add(new UntilTransformer($function));

return $this;
}

/**
* @lazy
*
Expand Down
52 changes: 52 additions & 0 deletions src/core/etl/src/Flow/ETL/Transformer/UntilTransformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Transformer;

use Flow\ETL\Exception\LimitReachedException;
use Flow\ETL\FlowContext;
use Flow\ETL\Function\ScalarFunction;
use Flow\ETL\Rows;
use Flow\ETL\Transformer;

/**
* @implements Transformer<array{function: ScalarFunction}>
*/
final class UntilTransformer implements Transformer
{
private bool $limitReached = false;

public function __construct(private readonly ScalarFunction $function)
{
}

public function __serialize() : array
{
return [
'function' => $this->function,
];
}

public function __unserialize(array $data) : void
{
$this->function = $data['function'];
}

public function transform(Rows $rows, FlowContext $context) : Rows
{
if ($this->limitReached) {
throw new LimitReachedException(0);
}

$nextRows = [];

foreach ($rows as $row) {
if (!$this->function->eval($row)) {
$this->limitReached = true;
} else {
$nextRows[] = $row;
}
}

return new Rows(...$nextRows);
}
}
35 changes: 33 additions & 2 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,39 @@ public function test_strict_validation_against_schema() : void
);
}

public function test_until() : void
{
$rows = (new Flow())
->read(From::chain(
From::array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
]),
From::array([
['id' => 6],
['id' => 7],
['id' => 8],
['id' => 9],
['id' => 10],
])
))
->until(ref('id')->lessThanEqual(lit(3)))
->fetch();

$this->assertCount(3, $rows);
$this->assertSame(
[
['id' => 1],
['id' => 2],
['id' => 3],
],
$rows->toArray()
);
}

public function test_void() : void
{
$rows = (new Flow())->process(
Expand Down Expand Up @@ -1785,7 +1818,6 @@ public function test_window_avg_function() : void
(new Flow)
->read(From::chain(From::memory($memoryPage1), From::memory($memoryPage2)))
->withEntry('avg_salary', average(ref('salary'))->over(window()->partitionBy(ref('department'))))
// ->withEntry('avg_salary', _Window::partitionBy(ref('department'))->orderBy(ref('salary')->desc())->avg(ref('salary')))
->select('department', 'avg_salary')
->dropDuplicates(ref('department'), ref('avg_salary'))
->withEntry('avg_salary', ref('avg_salary')->round(lit(0)))
Expand Down Expand Up @@ -1833,7 +1865,6 @@ public function test_window_rank_function() : void
->read(From::all(From::memory($memoryPage1), From::memory($memoryPage2)))
->dropDuplicates(ref('employee_name'), ref('department'))
->withEntry('rank', rank()->over(window()->partitionBy(ref('department'))->orderBy(ref('salary')->desc())))
// ->withEntry('rank', _Window::partitionBy(ref('department'))->orderBy(ref('salary')->desc())->rank())
->filter(ref('rank')->equals(lit(1)))
->fetch()
->toArray()
Expand Down

0 comments on commit e26c8eb

Please sign in to comment.