Skip to content

Commit

Permalink
Added onEach scalar function (#1168)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Aug 4, 2024
1 parent 75fa0e1 commit 897a13b
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 1 deletion.
50 changes: 50 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/OnEach.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\array_to_row;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;

final class OnEach extends ScalarFunctionChain
{
public function __construct(
private readonly ScalarFunction $ref,
private readonly ScalarFunction $function,
private readonly ScalarFunction|bool $preserveKeys = true
) {
}

public function eval(Row $row) : mixed
{
$value = $this->ref->eval($row);

if (!\is_array($value)) {
return null;
}

$preserveKeys = \is_bool($this->preserveKeys) ? $this->preserveKeys : (bool) $this->preserveKeys->eval($row);

$output = [];

foreach ($value as $key => $item) {
if ($preserveKeys) {
try {
$output[$key] = $this->function->eval(array_to_row(['element' => $item]));
} catch (InvalidArgumentException $e) {
$output[$key] = null;
}
} else {
try {
$output[] = $this->function->eval(array_to_row(['element' => $item]));
} catch (InvalidArgumentException $e) {
$output[] = null;
}
}
}

return $output;
}
}
13 changes: 12 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\{lit, ref, type_string};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Function;
use Flow\ETL\Function\ArrayExpand\ArrayExpand;
Expand Down Expand Up @@ -303,6 +303,17 @@ public function numberFormat(?ScalarFunction $decimals = null, ?ScalarFunction $
return new NumberFormat($this, $decimals, $decimalSeparator, $thousandsSeparator);
}

/**
* Execute a scalar function on each element of an array/list/map/structure entry.
* In order to use this function, you need to provide a reference to the "element" that will be used in the function.
*
* Example: $df->withEntry('array', ref('array')->onEach(ref('element')->cast(type_string())))
*/
public function onEach(self $cast, ScalarFunction|bool $preserveKeys = true) : OnEach
{
return new OnEach($this, $cast, $preserveKeys);
}

public function plus(ScalarFunction $ref) : self
{
return new Plus($this, $ref);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Function;

use function Flow\ETL\DSL\{df, from_array, ref, type_string};
use PHPUnit\Framework\TestCase;

final class OnEachTest extends TestCase
{
public function test_on_each_function() : void
{
$results = df()
->read(from_array([
['array' => ['a' => 1, 'b' => 2, 'c' => 3, 'd' => 4, 'e' => 5]],
['array' => ['f' => 1, 'g' => 2.3, 'h' => 3, 'i' => 4, 'j' => null]],
]))
->withEntry('array', ref('array')->onEach(ref('element')->cast(type_string())))
->fetch()
->toArray();

self::assertEquals(
[
['array' => ['a' => '1', 'b' => '2', 'c' => '3', 'd' => '4', 'e' => '5']],
['array' => ['f' => '1', 'g' => '2.3', 'h' => '3', 'i' => '4', 'j' => null]],
],
$results
);
}
}
75 changes: 75 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Function/OnEachTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Function;

use function Flow\ETL\DSL\{array_entry, ref, row, type_string};
use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase;

final class OnEachTest extends TestCase
{
public function test_executing_function_on_each_value_from_array() : void
{
self::assertSame(
['1', '2', '3', '4', '5'],
ref('array')->onEach(ref('element')->cast(type_string()))
->eval(
row(
array_entry(
'array',
[1, 2, 3, 4, 5]
)
)
),
);
}

public function test_executing_function_on_each_value_from_empty_array() : void
{
self::assertSame(
[],
ref('array')->onEach(ref('element')->cast(type_string()))
->eval(
row(
array_entry(
'array',
[]
)
)
),
);
}

public function test_executing_function_on_each_value_with_preserving_keys() : void
{
self::assertSame(
['a' => '1', 'b' => '2', 'c' => '3', 'd' => '4', 'e' => '5'],
ref('array')->onEach(ref('element')->cast(type_string()), true)
->eval(
row(
array_entry(
'array',
['a' => 1, 'b' => 2, 'c' => 3, 'd' => 4, 'e' => 5]
)
)
),
);
}

public function test_executing_function_on_each_value_without_preserving_keys() : void
{
self::assertSame(
['1', '2', '3', '4', '5'],
ref('array')->onEach(ref('element')->cast(type_string()), false)
->eval(
row(
array_entry(
'array',
['a' => 1, 'b' => 2, 'c' => 3, 'd' => 4, 'e' => 5]
)
)
),
);
}
}

0 comments on commit 897a13b

Please sign in to comment.