Skip to content

Commit

Permalink
Respect additional/missing schema fields when creating rows (#988)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Feb 13, 2024
1 parent 413d3cf commit 1824cb3
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 17 deletions.
50 changes: 46 additions & 4 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Flow\ETL\ErrorHandler\ThrowError;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\InvalidLogicException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\LocalFileListExtractor;
use Flow\ETL\Filesystem\Path;
Expand Down Expand Up @@ -916,12 +917,32 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry
foreach ($data as $key => $value) {
$name = \is_int($key) ? 'e' . \str_pad((string) $key, 2, '0', STR_PAD_LEFT) : $key;

$entries[$name] = $entryFactory->create($name, $value, $schema);
try {
$entries[$name] = $entryFactory->create($name, $value, $schema);
} catch (SchemaDefinitionNotFoundException $e) {
if ($schema === null) {
throw $e;
}
}
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
try {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
} catch (SchemaDefinitionNotFoundException $e) {
if ($schema === null) {
throw $e;
}
}
}
}

if ($schema !== null) {
foreach ($schema->definitions() as $definition) {
if (!\array_key_exists($definition->entry()->name(), $entries)) {
$entries[$definition->entry()->name()] = null_entry($definition->entry()->name());
}
}
}

Expand All @@ -935,12 +956,33 @@ function array_to_rows(array $data, EntryFactory $entryFactory = new NativeEntry

foreach ($row as $column => $value) {
$name = \is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column;
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value, $schema);

try {
$entries[$name] = $entryFactory->create(\is_int($column) ? 'e' . \str_pad((string) $column, 2, '0', STR_PAD_LEFT) : $column, $value, $schema);
} catch (SchemaDefinitionNotFoundException $e) {
if ($schema === null) {
throw $e;
}
}
}

foreach ($partitions as $partition) {
if (!\array_key_exists($partition->name, $entries)) {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
try {
$entries[$partition->name] = $entryFactory->create($partition->name, $partition->value, $schema);
} catch (SchemaDefinitionNotFoundException $e) {
if ($schema === null) {
throw $e;
}
}
}
}

if ($schema !== null) {
foreach ($schema->definitions() as $definition) {
if (!\array_key_exists($definition->entry()->name(), $entries)) {
$entries[$definition->entry()->name()] = null_entry($definition->entry()->name());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Exception;

final class InvalidArgumentException extends Exception
class InvalidArgumentException extends Exception
{
public static function because(string $format, float|int|string ...$parameters) : self
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Exception;

final class SchemaDefinitionNotFoundException extends InvalidArgumentException
{
public function __construct(private string $entry)
{
parent::__construct(\sprintf('Schema definition for entry "%s" not found', $entry));
}

public function entry() : string
{
return $this->entry;
}
}
9 changes: 9 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/EntryFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@

namespace Flow\ETL\Row;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;

interface EntryFactory
{
/**
* @throws InvalidArgumentException
* @throws RuntimeException
* @throws SchemaDefinitionNotFoundException
*/
public function create(string $entryName, mixed $value, ?Schema $schema = null) : Entry;
}
2 changes: 2 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use function Flow\ETL\DSL\xml_node_entry;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\PHP\Type\Caster\StringCastingHandler\StringTypeChecker;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
Expand Down Expand Up @@ -56,6 +57,7 @@ public function __construct()
/**
* @throws InvalidArgumentException
* @throws RuntimeException
* @throws SchemaDefinitionNotFoundException
*/
public function create(string $entryName, mixed $value, ?Schema $schema = null) : Entry
{
Expand Down
5 changes: 3 additions & 2 deletions src/core/etl/src/Flow/ETL/Row/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\ETL\Row;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\Row\Schema\Definition;

final class Schema implements \Countable
Expand Down Expand Up @@ -89,11 +90,11 @@ public function findDefinition(string|Reference $ref) : ?Definition
}

/**
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*/
public function getDefinition(string|Reference $ref) : Definition
{
return $this->findDefinition($ref) ?: throw new InvalidArgumentException("There is no definition for \"{$ref}\" in the schema.");
return $this->findDefinition($ref) ?: throw new SchemaDefinitionNotFoundException((string) $ref);
}

public function merge(self $schema) : self
Expand Down
5 changes: 0 additions & 5 deletions src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Flow\ETL\PHP\Type\AutoCaster;
use Flow\ETL\Row;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\Rows;
use Flow\ETL\Transformer;

Expand All @@ -22,10 +21,6 @@ public function transform(Rows $rows, FlowContext $context) : Rows
{
return $rows->map(function (Row $row) use ($context) {
return $row->map(function (Entry $entry) use ($context) {
// if (!$entry instanceof StringEntry) {
// return $entry;
// }

return $context->entryFactory()->create($entry->name(), $this->caster->cast($entry->value()));
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
use function Flow\ETL\DSL\uuid_entry;
use function Flow\ETL\DSL\xml_entry;
use Flow\ETL\Exception\CastingException;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
Expand Down Expand Up @@ -427,17 +427,15 @@ public function test_uuid_string_with_uuid_definition_provided() : void

public function test_with_empty_schema() : void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('There is no definition for "e" in the schema.');
$this->expectException(SchemaDefinitionNotFoundException::class);

(new NativeEntryFactory())
->create('e', '1', new Schema());
}

public function test_with_schema_for_different_entry() : void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('There is no definition for "diff" in the schema.');
$this->expectException(SchemaDefinitionNotFoundException::class);

(new NativeEntryFactory())
->create('diff', '1', new Schema(Schema\Definition::string('e')));
Expand Down
106 changes: 106 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/RowsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
use function Flow\ETL\DSL\array_entry;
use function Flow\ETL\DSL\array_to_rows;
use function Flow\ETL\DSL\bool_entry;
use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\datetime_entry;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\list_entry;
use function Flow\ETL\DSL\null_entry;
use function Flow\ETL\DSL\partition;
Expand All @@ -18,6 +20,7 @@
use function Flow\ETL\DSL\rows;
use function Flow\ETL\DSL\rows_partitioned;
use function Flow\ETL\DSL\str_entry;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\type_int;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_string;
Expand Down Expand Up @@ -178,6 +181,50 @@ public function test_array_access_unset() : void
unset($rows[0]);
}

public function test_building_row_from_array_with_schema_and_additional_fields_not_covered_by_schema() : void
{
$rows = array_to_rows(
['id' => 1234, 'deleted' => false, 'phase' => null],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
)
);

$this->assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
),
),
$rows
);
}

public function test_building_row_from_array_with_schema_but_entries_not_available_in_rows() : void
{
$rows = array_to_rows(
['id' => 1234, 'deleted' => false],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
str_schema('phase', true),
)
);

$this->assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
null_entry('phase')
),
),
$rows
);
}

public function test_building_rows_from_array() : void
{
$rows = array_to_rows(
Expand All @@ -204,6 +251,65 @@ public function test_building_rows_from_array() : void
);
}

public function test_building_rows_from_array_with_schema_and_additional_fields_not_covered_by_schema() : void
{
$rows = array_to_rows(
[
['id' => 1234, 'deleted' => false, 'phase' => null],
['id' => 4321, 'deleted' => true, 'phase' => 'launch'],
],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
)
);

$this->assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
),
row(
int_entry('id', 4321),
bool_entry('deleted', true),
)
),
$rows
);
}

public function test_building_rows_from_array_with_schema_but_entries_not_available_in_rows() : void
{
$rows = array_to_rows(
[
['id' => 1234, 'deleted' => false],
['id' => 4321, 'deleted' => true],
],
schema: new Schema(
int_schema('id'),
bool_schema('deleted'),
str_schema('phase', true),
)
);

$this->assertEquals(
rows(
row(
int_entry('id', 1234),
bool_entry('deleted', false),
null_entry('phase')
),
row(
int_entry('id', 4321),
bool_entry('deleted', true),
null_entry('phase')
)
),
$rows
);
}

public function test_chunks_with_less() : void
{
$rows = rows(
Expand Down

0 comments on commit 1824cb3

Please sign in to comment.