Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DataFrame::autoCast() #923

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/topics/types/csv/csv_read_with_autocast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\to_output;
use Aeon\Calendar\Stopwatch;
use Flow\ETL\Flow;
use Flow\ETL\Loader\StreamLoader\Output;

require __DIR__ . '/../../../bootstrap.php';

if (!\file_exists(__FLOW_OUTPUT__ . '/dataset.csv')) {
include __DIR__ . '/../../../setup/php_to_csv.php';
}

$flow = (new Flow())
->read(from_csv(__FLOW_OUTPUT__ . '/dataset.csv'))
->limit(1000)
->autoCast()
->collect()
->write(to_output(false, Output::rows_and_schema));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
}

$csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024);
print "Reading CSV {$csvFileSize}Mb file...\n";

$stopwatch = new Stopwatch();
$stopwatch->start();

$flow->run();

$stopwatch->stop();

print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n";
8 changes: 8 additions & 0 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Flow\ETL\Row\Reference;
use Flow\ETL\Row\References;
use Flow\ETL\Row\Schema;
use Flow\ETL\Transformer\AutoCastTransformer;
use Flow\ETL\Transformer\CallbackRowTransformer;
use Flow\ETL\Transformer\CrossJoinRowsTransformer;
use Flow\ETL\Transformer\DropDuplicatesTransformer;
Expand Down Expand Up @@ -147,6 +148,13 @@ public function appendSafe(bool $appendSafe = true) : self
return $this;
}

public function autoCast() : self
{
$this->pipeline->add(new AutoCastTransformer());

return $this;
}

/**
* Merge/Split Rows yielded by Extractor into batches of given size.
* For example, when Extractor is yielding one row at time, this method will merge them into batches of given size
Expand Down
77 changes: 9 additions & 68 deletions src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,18 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null)

if ($valueType instanceof ScalarType) {
if ($valueType->isString()) {
$trimmedValue = \trim($value);
$stringChecker = new StringTypeChecker($value);

if ('' !== $trimmedValue) {
if ($this->isJson($trimmedValue)) {
return json_entry($entryName, $value);
}
if ($stringChecker->isJson()) {
return json_entry($entryName, $value);
}

if ($this->isUuid($trimmedValue)) {
return uuid_entry($entryName, Entry\Type\Uuid::fromString($value));
}
if ($stringChecker->isUuid()) {
return uuid_entry($entryName, Entry\Type\Uuid::fromString($value));
}

if ($this->isXML($trimmedValue)) {
return xml_entry($entryName, $value);
}
if ($stringChecker->isXML()) {
return xml_entry($entryName, $value);
}

return str_entry($entryName, $value);
Expand Down Expand Up @@ -246,61 +244,4 @@ private function fromDefinition(Schema\Definition $definition, mixed $value) : E

throw new InvalidArgumentException("Can't convert value into entry \"{$definition->entry()}\"");
}

private function isJson(string $string) : bool
{
if ('{' !== $string[0] && '[' !== $string[0]) {
return false;
}

if (
(!\str_starts_with($string, '{') || !\str_ends_with($string, '}'))
&& (!\str_starts_with($string, '[') || !\str_ends_with($string, ']'))
) {
return false;
}

try {
return \is_array(\json_decode($string, true, flags: \JSON_THROW_ON_ERROR));
} catch (\Exception) {
return false;
}
}

private function isUuid(string $string) : bool
{
if (\strlen($string) !== 36) {
return false;
}

return 0 !== \preg_match(Entry\Type\Uuid::UUID_REGEXP, $string);
}

private function isXML(string $string) : bool
{
if ('<' !== $string[0]) {
return false;
}

if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $string) === 1) {
try {
\libxml_use_internal_errors(true);

$doc = new \DOMDocument();
$result = $doc->loadXML($string);
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

/** @psalm-suppress RedundantCastGivenDocblockType */
return (bool) $result;
} catch (\Exception) {
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

return false;
}
}

return false;
}
}
158 changes: 158 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Factory/StringTypeChecker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Row\Factory;

use Flow\ETL\Row\Entry\Type\Uuid;

final class StringTypeChecker
{
private readonly string $string;

public function __construct(string $string)
{
$this->string = \trim($string);
}

public function isBoolean() : bool
{
if ($this->string === '') {
return false;
}

return \in_array(\strtolower($this->string), ['true', 'false'], true);
}

public function isDateTime() : bool
{
if ($this->string === '') {
return false;
}

$dateParts = \date_parse($this->string);

if ($dateParts['error_count'] > 0) {
return false;
}

if ($dateParts['year'] === false) {
return false;
}

if ($dateParts['month'] === false) {
return false;
}

if ($dateParts['day'] === false) {
return false;
}

return true;
}

public function isFloat() : bool
{
if ($this->string === '') {
return false;
}

return \is_numeric($this->string) && \str_contains($this->string, '.');
}

public function isInteger() : bool
{
if ($this->string === '') {
return false;
}

if (\is_numeric($this->string)) {
return (string) ((int) $this->string) === $this->string;
}

return false;
}

public function isJson() : bool
{
if ($this->string === '') {
return false;
}

if ('{' !== $this->string[0] && '[' !== $this->string[0]) {
return false;
}

if (\function_exists('json_validate')) {
return \json_validate($this->string);
}

if (
(!\str_starts_with($this->string, '{') || !\str_ends_with($this->string, '}'))
&& (!\str_starts_with($this->string, '[') || !\str_ends_with($this->string, ']'))
) {
return false;
}

try {
return \is_array(\json_decode($this->string, true, flags: \JSON_THROW_ON_ERROR));
} catch (\Exception) {
return false;
}
}

public function isNull() : bool
{
return \in_array(\mb_strtolower($this->string), ['null', 'nil'], true);
}

public function isUuid() : bool
{
if ($this->string === '') {
return false;
}

if (\strlen($this->string) !== 36) {
return false;
}

return 0 !== \preg_match(Uuid::UUID_REGEXP, $this->string);
}

public function isXML() : bool
{
if ($this->string === '') {
return false;
}

if ('<' !== $this->string[0]) {
return false;
}

if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $this->string) === 1) {
try {
\libxml_use_internal_errors(true);

$doc = new \DOMDocument();
$result = $doc->loadXML($this->string);
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

/** @psalm-suppress RedundantCastGivenDocblockType */
return (bool) $result;
} catch (\Exception) {
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

return false;
}
}

return false;
}

public function value() : string
{
return $this->string;
}
}
70 changes: 70 additions & 0 deletions src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Transformer;

use function Flow\ETL\DSL\bool_entry;
use function Flow\ETL\DSL\datetime_entry;
use function Flow\ETL\DSL\float_entry;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\json_entry;
use function Flow\ETL\DSL\null_entry;
use function Flow\ETL\DSL\uuid_entry;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\Rows;
use Flow\ETL\Transformer;

final class AutoCastTransformer implements Transformer
{
public function autoCast(Entry $entry) : Entry
{
if (!$entry instanceof StringEntry) {
return $entry;
}

$typeChecker = new Row\Factory\StringTypeChecker($entry->value());

if ($typeChecker->isNull()) {
return null_entry($entry->name());
}

if ($typeChecker->isInteger()) {
return int_entry($entry->name(), (int) $entry->value());
}

if ($typeChecker->isFloat()) {
return float_entry($entry->name(), (float) $entry->value());
}

if ($typeChecker->isBoolean()) {
return bool_entry($entry->name(), (bool) $entry->value());
}

if ($typeChecker->isJson()) {
return json_entry($entry->name(), $entry->value());
}

if ($typeChecker->isUuid()) {
return uuid_entry($entry->name(), $entry->value());
}

if ($typeChecker->isDateTime()) {
return datetime_entry($entry->name(), $entry->value());
}

return $entry;
}

public function transform(Rows $rows, FlowContext $context) : Rows
{
return $rows->map(function (Row $row) {
return $row->map(function (Entry $entry) {
return $this->autoCast($entry);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add later some additional caching layer here, that would keep a type and entry after first casting to avoid going through the same detection process over and over.
Of course it's still possible that not all rows would get the same type, but in that case we can catch exception and fallback to autoCast function.

});
});
}
}
Loading