Skip to content

Commit

Permalink
Fixed Json extractor when pointer is used to extract flat arrays (#1211)
Browse files Browse the repository at this point in the history
* Fixed Json extractor when pointer is used to extract flat arrays

* Removed commented code
  • Loading branch information
norberttech committed Sep 6, 2024
1 parent a244986 commit 4eaee66
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Adapter\JSON\JSONMachine;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\ETL\DSL\{array_to_rows};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Row\Schema;
use Flow\ETL\{Extractor, FlowContext};
Expand Down Expand Up @@ -43,7 +43,15 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $stream->path()->partitions(), $this->schema);
if ($this->pointer !== null) {
$row = [$this->pointer => $row];
}

if (!\count($row)) {
continue;
}

$signal = yield array_to_rows([$row], $context->entryFactory(), $stream->path()->partitions(), $this->schema);
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function test_extracting_json_from_local_file_stream_using_pointer() : vo
'capital',

],
\array_keys($row->toArray())
\array_keys($row->get('/timezones')->value())
);
}

Expand Down
6 changes: 0 additions & 6 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -997,12 +997,6 @@ function number_format(ScalarFunction|int|float $value, ScalarFunction|int $deci
#[DocumentationDSL(module: Module::CORE, type: DSLType::DATA_FRAME)]
function array_to_row(array $data, EntryFactory $entryFactory = new NativeEntryFactory(), array|Partitions $partitions = [], ?Schema $schema = null) : Row
{
foreach ($data as $key => $v) {
if (!\is_string($key)) {
throw new InvalidArgumentException('Passed array keys must be a string. Maybe consider using "array_to_rows()" function?');
}
}

$entries = [];

foreach ($data as $key => $value) {
Expand Down
128 changes: 128 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Row/ArrayToRowTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Row;

use function Flow\ETL\DSL\{array_to_row,
bool_entry,
bool_schema,
int_entry,
int_schema,
list_entry,
row,
str_entry,
str_schema,
struct_element,
struct_entry,
struct_type,
type_boolean,
type_int,
type_list,
type_null,
type_string};
use Flow\ETL\Row\Schema;
use PHPUnit\Framework\TestCase;

final class ArrayToRowTest extends TestCase
{
public function test_building_array_to_row_with_entry_that_is_list_of_strings() : void
{
$row = array_to_row(['data' => ['a', 'b', 'c', 'd']]);

self::assertEquals(
row(list_entry('data', ['a', 'b', 'c', 'd'], type_list(type_string()))),
$row
);
}

public function test_building_single_row_from_array_with_rows_fails() : void
{
$row = array_to_row(
[
['id' => 1234, 'deleted' => false, 'phase' => null],
['id' => 4321, 'deleted' => true, 'phase' => 'launch'],
]
);

self::assertEquals(
row(
struct_entry(
'e00',
['id' => 1234, 'deleted' => false, 'phase' => null],
struct_type([
struct_element('id', type_int()),
struct_element('deleted', type_boolean()),
struct_element('phase', type_null()),
])
),
struct_entry(
'e01',
['id' => 4321, 'deleted' => true, 'phase' => 'launch'],
struct_type([
struct_element('id', type_int()),
struct_element('deleted', type_boolean()),
struct_element('phase', type_string()),
])
)
),
$row
);
}

public function test_building_single_row_from_array_with_schema_and_additional_fields_not_covered_by_schema() : void
{
$row = array_to_row(
['id' => 1234, 'deleted' => false, 'phase' => null],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
)
);

self::assertEquals(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
),
$row
);
}

public function test_building_single_row_from_array_with_schema_but_entries_not_available_in_rows() : void
{
$row = array_to_row(
['id' => 1234, 'deleted' => false],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
str_schema('phase', true),
)
);

self::assertEquals(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
str_entry('phase', null)
),
$row
);
}

public function test_building_single_row_from_flat_array() : void
{
$row = array_to_row(
['id' => 1234, 'deleted' => false, 'phase' => null],
);

self::assertEquals(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
str_entry('phase', null),
),
$row
);
}
}
192 changes: 192 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Rows/ArrayToRowsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Rows;

use function Flow\ETL\DSL\{array_to_rows,
bool_entry,
bool_schema,
int_entry,
int_schema,
list_entry,
row,
rows,
str_entry,
str_schema,
type_list,
type_string};
use Flow\ETL\Row\Schema;
use PHPUnit\Framework\TestCase;

final class ArrayToRowsTest extends TestCase
{
public function test_building_array_to_rows_with_entry_that_is_list_of_strings() : void
{
$rows = array_to_rows(
[
['data' => ['a', 'b', 'c', 'd']],
['data' => ['e', 'f', 'g', 'd']],
]
);

self::assertEquals(
rows(
row(
list_entry('data', ['a', 'b', 'c', 'd'], type_list(type_string())),
),
row(
list_entry('data', ['e', 'f', 'g', 'd'], type_list(type_string())),
),
),
$rows
);
}

public function test_building_array_to_rows_with_entry_that_is_list_of_strings_with_one_row() : void
{
$rows = array_to_rows(
[
['data' => ['e', 'f', 'g', 'd']],
]
);

self::assertEquals(
rows(
row(
list_entry('data', ['e', 'f', 'g', 'd'], type_list(type_string())),
),
),
$rows
);
}

public function test_building_row_from_array_with_schema_and_additional_fields_not_covered_by_schema() : void
{
$rows = array_to_rows(
['id' => 1234, 'deleted' => false, 'phase' => null],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
)
);

self::assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
),
),
$rows
);
}

public function test_building_row_from_array_with_schema_but_entries_not_available_in_rows() : void
{
$rows = array_to_rows(
['id' => 1234, 'deleted' => false],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
str_schema('phase', true),
)
);

self::assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
str_entry('phase', null)
),
),
$rows
);
}

public function test_building_rows_from_array() : void
{
$rows = array_to_rows(
[
['id' => 1234, 'deleted' => false, 'phase' => null],
['id' => 4321, 'deleted' => true, 'phase' => 'launch'],
]
);

self::assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
str_entry('phase', null),
),
row(
int_entry('id', 4321),
bool_entry('deleted', true),
str_entry('phase', 'launch'),
)
),
$rows
);
}

public function test_building_rows_from_array_with_schema_and_additional_fields_not_covered_by_schema() : void
{
$rows = array_to_rows(
[
['id' => 1234, 'deleted' => false, 'phase' => null],
['id' => 4321, 'deleted' => true, 'phase' => 'launch'],
],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
)
);

self::assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
),
row(
int_entry('id', 4321),
bool_entry('deleted', true),
)
),
$rows
);
}

public function test_building_rows_from_array_with_schema_but_entries_not_available_in_rows() : void
{
$rows = array_to_rows(
[
['id' => 1234, 'deleted' => false],
['id' => 4321, 'deleted' => true],
],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
str_schema('phase', true),
)
);

self::assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
str_entry('phase', null)
),
row(
int_entry('id', 4321),
bool_entry('deleted', true),
str_entry('phase', null)
)
),
$rows
);
}
}
Loading

0 comments on commit 4eaee66

Please sign in to comment.