From d6a2ab7e1082bba52291cd11a397869db1e0ae69 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Thu, 18 Jan 2024 11:04:47 +0100 Subject: [PATCH 1/2] Checkpoint --- .../src/Flow/ETL/Adapter/CSV/CSVDetector.php | 100 +++++++++++++++ .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 2 +- .../src/Flow/ETL/Adapter/CSV/functions.php | 5 + .../CSV/Tests/Integration/CSVDetectorTest.php | 115 ++++++++++++++++++ 4 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php create mode 100644 src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVDetectorTest.php diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php new file mode 100644 index 000000000..693678520 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php @@ -0,0 +1,100 @@ +resource = $resource; + $this->startingPosition = \ftell($resource); + } + + public function __destruct() + { + \fseek($this->resource, $this->startingPosition); + } + + /** + * @throws InvalidArgumentException + * @throws RuntimeException + */ + public function separator(int $lines = 5) : string + { + if ($lines < 1) { + throw new InvalidArgumentException('Lines must be greater than 0'); + } + + $delimiters = [ + ',' => [], + "\t" => [], + ';' => [], + '|' => [], + ' ' => [], + '_' => [], + '-' => [], + ':' => [], + ]; + + $readLines = 1; + + while ($line = \fgets($this->resource)) { + foreach ($delimiters as $delimiter => $count) { + $row = \str_getcsv($line, $delimiter); + $delimiters[$delimiter][] = \count($row); + } + + if ($readLines++ >= $lines) { + break; + } + } + + foreach ($delimiters as $delimiter => $rows) { + $columnsCount = null; + + foreach ($rows as $rowColumns) { + if ($columnsCount === null) { + $columnsCount = $rowColumns; + } + + if ($columnsCount !== $rowColumns) { + unset($delimiters[$delimiter]); + + break; + } + } + } + + $delimiters = \array_map(fn (array $rows) : int => \array_sum($rows), $delimiters); + + \arsort($delimiters); + + $delimiters = \array_filter($delimiters, fn (int $count) : bool => $count > $lines); + + if (!\count($delimiters)) { + \fseek($this->resource, $this->startingPosition); + + throw new RuntimeException('Cannot detect delimiter'); + } + + \fseek($this->resource, $this->startingPosition); + + return \array_key_first($delimiters); + } +} diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index 2eddee69a..4abbab8a0 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -28,7 +28,7 @@ public function __construct( private readonly Path $path, private readonly bool $withHeader = true, private readonly bool $emptyToNull = true, - private readonly string $separator = ',', + private readonly string|null $separator = null, private readonly string $enclosure = '"', private readonly string $escape = '\\', private readonly int $charactersReadInLine = 1000 diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php index 6693bdc1e..df045bc2a 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php @@ -67,3 +67,8 @@ function to_csv( $new_line_separator ); } + +function csv_detect_separator($resource, int $lines = 5) : string +{ + return (new CSVDetector($resource))->separator($lines); +} diff --git a/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVDetectorTest.php b/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVDetectorTest.php new file mode 100644 index 000000000..f994ae749 --- /dev/null +++ b/src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Integration/CSVDetectorTest.php @@ -0,0 +1,115 @@ +createResource(',')); + + $this->assertSame(',', $detector->separator()); + } + + public function test_detecting_comma_with_custom_enclosure() : void + { + $detector = new CSVDetector($this->createResource(',', "'")); + + $this->assertSame(',', $detector->separator()); + } + + public function test_detecting_dash() : void + { + $detector = new CSVDetector($this->createResource('-')); + + $this->assertSame('-', $detector->separator()); + } + + public function test_detecting_double_dot() : void + { + $detector = new CSVDetector($this->createResource(':')); + + $this->assertSame(':', $detector->separator()); + } + + public function test_detecting_no_delimiter() : void + { + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Cannot detect delimiter'); + + $detector = new CSVDetector($this->createResource('{')); + $detector->separator(); + } + + public function test_detecting_pipe() : void + { + $detector = new CSVDetector($this->createResource('|')); + + $this->assertSame('|', $detector->separator()); + } + + public function test_detecting_semicolon() : void + { + $detector = new CSVDetector($this->createResource(';')); + + $this->assertSame(';', $detector->separator()); + } + + public function test_detecting_space() : void + { + $detector = new CSVDetector($this->createResource(' ')); + + $this->assertSame(' ', $detector->separator()); + } + + public function test_detecting_tab_delimiter() : void + { + $detector = new CSVDetector($this->createResource("\t")); + + $this->assertSame("\t", $detector->separator()); + } + + public function test_detecting_underscore() : void + { + $detector = new CSVDetector($this->createResource('_')); + + $this->assertSame('_', $detector->separator()); + } + + /** + * @return resource + */ + private function createResource(string $separator, string $enclosure = '"') + { + $data = [ + ['id', 'name', 'email'], + ['1', 'John Doe', 'john@example.com'], + ['2', 'Jane Doe', 'jane@example.com'], + ['3', 'Mark', 'mark@example.com'], + ['4', 'Kate', 'kate@example.com'], + ['5', 'Peter', 'peter@example.com'], + ['6', 'Paul', 'paul@example.com'], + ['7', 'Mary', 'mary@example.com'], + ['8', 'Anna', 'anna@example.com'], + ['9', 'Robert', 'rober@example.com'], + ['10', 'Lucy', 'lucy@example.com'], + ['11', 'Ro\'bert', 'rob_ert@example.com'], + ]; + + $resource = \fopen('php://memory', 'rb+'); + + foreach ($data as $line) { + \fputcsv($resource, $line, $separator, $enclosure); + } + + \rewind($resource); + + return $resource; + } +} From f3b5b23b477de16f0b593d085254a84dfc15d200 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Thu, 18 Jan 2024 13:12:01 +0100 Subject: [PATCH 2/2] Added automated detection of CSV separator and enclousure --- .../src/Flow/ETL/Adapter/CSV/CSVDetector.php | 74 +++++-------- .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 18 ++-- .../Flow/ETL/Adapter/CSV/Detector/Option.php | 94 ++++++++++++++++ .../Flow/ETL/Adapter/CSV/Detector/Options.php | 85 +++++++++++++++ .../CSV/Exception/CantDetectCSVOptions.php | 11 ++ .../src/Flow/ETL/Adapter/CSV/functions.php | 18 +++- .../CSV/Tests/Integration/CSVDetectorTest.php | 101 +++++++----------- .../CSV/Tests/Unit/Detector/OptionTest.php | 37 +++++++ 8 files changed, 320 insertions(+), 118 deletions(-) create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php create mode 100644 src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Exception/CantDetectCSVOptions.php create mode 100644 src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Unit/Detector/OptionTest.php diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php index 693678520..8da757b54 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php @@ -4,26 +4,39 @@ namespace Flow\ETL\Adapter\CSV; +use Flow\ETL\Adapter\CSV\Detector\Option; +use Flow\ETL\Adapter\CSV\Detector\Options; +use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions; use Flow\ETL\Exception\InvalidArgumentException; -use Flow\ETL\Exception\RuntimeException; final class CSVDetector { + private ?Option $fallback; + + private Options $options; + + /** + * @var resource + */ private $resource; private int $startingPosition; /** - * @param $resource + * @param resource $resource */ - public function __construct($resource) + public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null) { + /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($resource)) { throw new InvalidArgumentException('Argument must be a valid resource'); } $this->resource = $resource; + /** @phpstan-ignore-next-line */ $this->startingPosition = \ftell($resource); + $this->options = $options ?? Options::all(); + $this->fallback = $fallback; } public function __destruct() @@ -32,69 +45,36 @@ public function __destruct() } /** - * @throws InvalidArgumentException - * @throws RuntimeException + * @throws CantDetectCSVOptions|InvalidArgumentException */ - public function separator(int $lines = 5) : string + public function detect(int $lines = 5) : Option { if ($lines < 1) { throw new InvalidArgumentException('Lines must be greater than 0'); } - $delimiters = [ - ',' => [], - "\t" => [], - ';' => [], - '|' => [], - ' ' => [], - '_' => [], - '-' => [], - ':' => [], - ]; - $readLines = 1; while ($line = \fgets($this->resource)) { - foreach ($delimiters as $delimiter => $count) { - $row = \str_getcsv($line, $delimiter); - $delimiters[$delimiter][] = \count($row); - } + $this->options->parse($line); if ($readLines++ >= $lines) { break; } } - foreach ($delimiters as $delimiter => $rows) { - $columnsCount = null; - - foreach ($rows as $rowColumns) { - if ($columnsCount === null) { - $columnsCount = $rowColumns; - } - - if ($columnsCount !== $rowColumns) { - unset($delimiters[$delimiter]); - - break; - } + try { + $bestOption = $this->options->onlyValid()->best(); + } catch (CantDetectCSVOptions $e) { + if ($this->fallback) { + return $this->fallback; } - } - - $delimiters = \array_map(fn (array $rows) : int => \array_sum($rows), $delimiters); - - \arsort($delimiters); - $delimiters = \array_filter($delimiters, fn (int $count) : bool => $count > $lines); - - if (!\count($delimiters)) { - \fseek($this->resource, $this->startingPosition); - - throw new RuntimeException('Cannot detect delimiter'); + throw $e; } - \fseek($this->resource, $this->startingPosition); + $this->options = $this->options->reset(); - return \array_key_first($delimiters); + return $bestOption; } } diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index 4abbab8a0..acbdd5903 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -29,8 +29,8 @@ public function __construct( private readonly bool $withHeader = true, private readonly bool $emptyToNull = true, private readonly string|null $separator = null, - private readonly string $enclosure = '"', - private readonly string $escape = '\\', + private readonly string|null $enclosure = null, + private readonly string|null $escape = null, private readonly int $charactersReadInLine = 1000 ) { $this->resetLimit(); @@ -43,15 +43,21 @@ public function extract(FlowContext $context) : \Generator foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) { $stream = $context->streams()->fs()->open($path, Mode::READ); + $option = \Flow\ETL\Adapter\CSV\csv_detect_separator($stream->resource()); + + $separator = $this->separator ?? $option->separator; + $enclosure = $this->enclosure ?? $option->enclosure; + $escape = $this->escape ?? $option->escape; + $headers = []; if ($this->withHeader && \count($headers) === 0) { /** @var array $headers */ - $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } /** @var array $rowData */ - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); if (!\count($headers)) { $headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1)); @@ -81,7 +87,7 @@ public function extract(FlowContext $context) : \Generator } if (\count($headers) !== \count($rowData)) { - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); continue; } @@ -101,7 +107,7 @@ public function extract(FlowContext $context) : \Generator return; } - $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape); + $rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape); } } diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php new file mode 100644 index 000000000..56906b504 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Option.php @@ -0,0 +1,94 @@ + + */ + private array $rows; + + public function __construct( + public string $separator, + public string $enclosure, + public string $escape = '\\' + ) { + if (\mb_strlen($this->separator) !== 1) { + throw new InvalidArgumentException('Separator must be a single character'); + } + + if (\mb_strlen($this->enclosure) !== 1) { + throw new InvalidArgumentException('Enclosure must be a single character'); + } + + $this->rows = []; + } + + public function isValid() : bool + { + $columnsCount = null; + + foreach ($this->rows as $row) { + if ($columnsCount === null) { + $columnsCount = \count($row); + + continue; + } + + if ($columnsCount !== \count($row)) { + return false; + } + } + + if ($columnsCount === 1) { + return false; + } + + return true; + } + + public function parse(string $line) : void + { + $this->rows[] = \str_getcsv($line, $this->separator, $this->enclosure); + } + + public function reset() : self + { + return new self($this->separator, $this->enclosure); + } + + public function score() : int + { + if (!$this->isValid()) { + return 0; + } + + if (!\count($this->rows)) { + return 0; + } + + $columnScore = \count($this->rows[0]) * self::COLUMN_SCORE_WEIGHT; + $totalLength = \array_reduce( + $this->rows, + static fn (int $carry, array $row) : int => $carry + \array_reduce( + $row, + static fn (int $carry, $column) : int => $carry + (\is_string($column) ? \mb_strlen($column) : 0), + 0 + ), + 0 + ); + + $lengthScore = (int) \round((1 / ($totalLength + 1) * self::COLUMNS_LENGTH_WEIGHT)); + + return $columnScore + $lengthScore; + } +} diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php new file mode 100644 index 000000000..63eefb848 --- /dev/null +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/Detector/Options.php @@ -0,0 +1,85 @@ + + */ + private array $options; + + /** + * @param array