Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV options detection #918

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV;

use Flow\ETL\Adapter\CSV\Detector\Option;
use Flow\ETL\Adapter\CSV\Detector\Options;
use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions;
use Flow\ETL\Exception\InvalidArgumentException;

final class CSVDetector
{
private ?Option $fallback;

private Options $options;

/**
* @var resource
*/
private $resource;

private int $startingPosition;

/**
* @param resource $resource
*/
public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null)
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($resource)) {
throw new InvalidArgumentException('Argument must be a valid resource');
}

$this->resource = $resource;
/** @phpstan-ignore-next-line */
$this->startingPosition = \ftell($resource);
$this->options = $options ?? Options::all();
$this->fallback = $fallback;
}

public function __destruct()
{
\fseek($this->resource, $this->startingPosition);
}

/**
* @throws CantDetectCSVOptions|InvalidArgumentException
*/
public function detect(int $lines = 5) : Option
{
if ($lines < 1) {
throw new InvalidArgumentException('Lines must be greater than 0');
}

$readLines = 1;

while ($line = \fgets($this->resource)) {
$this->options->parse($line);

if ($readLines++ >= $lines) {
break;
}
}

try {
$bestOption = $this->options->onlyValid()->best();
} catch (CantDetectCSVOptions $e) {
if ($this->fallback) {
return $this->fallback;
}

throw $e;
}

$this->options = $this->options->reset();

return $bestOption;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public function __construct(
private readonly Path $path,
private readonly bool $withHeader = true,
private readonly bool $emptyToNull = true,
private readonly string $separator = ',',
private readonly string $enclosure = '"',
private readonly string $escape = '\\',
private readonly string|null $separator = null,
private readonly string|null $enclosure = null,
private readonly string|null $escape = null,
private readonly int $charactersReadInLine = 1000
) {
$this->resetLimit();
Expand All @@ -43,15 +43,21 @@ public function extract(FlowContext $context) : \Generator
foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $path) {
$stream = $context->streams()->fs()->open($path, Mode::READ);

$option = \Flow\ETL\Adapter\CSV\csv_detect_separator($stream->resource());

$separator = $this->separator ?? $option->separator;
$enclosure = $this->enclosure ?? $option->enclosure;
$escape = $this->escape ?? $option->escape;

$headers = [];

if ($this->withHeader && \count($headers) === 0) {
/** @var array<string> $headers */
$headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$headers = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);
}

/** @var array<mixed> $rowData */
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);

if (!\count($headers)) {
$headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1));
Expand Down Expand Up @@ -81,7 +87,7 @@ public function extract(FlowContext $context) : \Generator
}

if (\count($headers) !== \count($rowData)) {
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);

continue;
}
Expand All @@ -101,7 +107,7 @@ public function extract(FlowContext $context) : \Generator
return;
}

$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $this->separator, $this->enclosure, $this->escape);
$rowData = \fgetcsv($stream->resource(), $this->charactersReadInLine, $separator, $enclosure, $escape);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Detector;

use Flow\ETL\Exception\InvalidArgumentException;

final class Option
{
private const COLUMN_SCORE_WEIGHT = 100_000;

private const COLUMNS_LENGTH_WEIGHT = 10_000;

/**
* @var array<mixed>
*/
private array $rows;

public function __construct(
public string $separator,
public string $enclosure,
public string $escape = '\\'
) {
if (\mb_strlen($this->separator) !== 1) {
throw new InvalidArgumentException('Separator must be a single character');
}

if (\mb_strlen($this->enclosure) !== 1) {
throw new InvalidArgumentException('Enclosure must be a single character');
}

$this->rows = [];
}

public function isValid() : bool
{
$columnsCount = null;

foreach ($this->rows as $row) {
if ($columnsCount === null) {
$columnsCount = \count($row);

continue;
}

if ($columnsCount !== \count($row)) {
return false;
}
}

if ($columnsCount === 1) {
return false;
}

return true;
}

public function parse(string $line) : void
{
$this->rows[] = \str_getcsv($line, $this->separator, $this->enclosure);
}

public function reset() : self
{
return new self($this->separator, $this->enclosure);
}

public function score() : int
{
if (!$this->isValid()) {
return 0;
}

if (!\count($this->rows)) {
return 0;
}

$columnScore = \count($this->rows[0]) * self::COLUMN_SCORE_WEIGHT;
$totalLength = \array_reduce(
$this->rows,
static fn (int $carry, array $row) : int => $carry + \array_reduce(
$row,
static fn (int $carry, $column) : int => $carry + (\is_string($column) ? \mb_strlen($column) : 0),
0
),
0
);

$lengthScore = (int) \round((1 / ($totalLength + 1) * self::COLUMNS_LENGTH_WEIGHT));

return $columnScore + $lengthScore;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Detector;

use Flow\ETL\Adapter\CSV\Exception\CantDetectCSVOptions;

final class Options
{
/**
* @var array<Option>
*/
private array $options;

/**
* @param array<Option> $options
*/
public function __construct(array $options)
{
$this->options = $options;
}

public static function all() : self
{
$separators = [',', "\t", ';', '|', ' ', '_', '-', ':', '~', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '?', '!', '\\', '/', '.', '>', '<'];
$enclosures = ['"', "'"];

$options = [];

foreach ($separators as $separator) {
foreach ($enclosures as $enclosure) {
$options[] = new Option($separator, $enclosure);
}
}

return new self($options);
}

public function best() : Option
{
$best = null;

foreach ($this->options as $option) {
if ($best === null) {
$best = $option;

continue;
}

if ($option->score() > $best->score()) {
$best = $option;
}
}

if ($best === null) {
throw new CantDetectCSVOptions('No best option found');
}

return $best;
}

public function onlyValid() : self
{
return new self(\array_filter($this->options, fn (Option $option) : bool => $option->isValid()));
}

public function parse(string $line) : void
{
foreach ($this->options as $option) {
$option->parse($line);
}
}

public function reset() : self
{
$options = [];

foreach ($this->options as $option) {
$options[] = $option->reset();
}

return new self($options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\CSV\Exception;

use Flow\ETL\Exception\RuntimeException;

final class CantDetectCSVOptions extends RuntimeException
{
}
19 changes: 16 additions & 3 deletions src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Flow\ETL\Adapter\CSV;

use function Flow\ETL\DSL\from_all;
use Flow\ETL\Adapter\CSV\Detector\Option;
use Flow\ETL\Adapter\CSV\Detector\Options;
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
Expand All @@ -16,9 +18,9 @@ function from_csv(
string|Path|array $path,
bool $with_header = true,
bool $empty_to_null = true,
string $delimiter = ',',
string $enclosure = '"',
string $escape = '\\',
string|null $delimiter = null,
string|null $enclosure = null,
string|null $escape = null,
int $characters_read_in_line = 1000
) : Extractor {
if (\is_array($path)) {
Expand Down Expand Up @@ -67,3 +69,14 @@ function to_csv(
$new_line_separator
);
}

/**
* @param resource $resource - valid resource to CSV file opened with 'r' mode
* @param int<1, max> $lines - number of lines to read from CSV file, default 5, more lines means more accurate detection but slower detection
* @param null|Option $fallback - fallback option to use when no best option can be detected, default is Option(',', '"', '\\')
* @param null|Options $options - options to use for detection, default is Options::all()
*/
function csv_detect_separator($resource, int $lines = 5, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null) : Option
{
return (new CSVDetector($resource, $fallback, $options))->detect($lines);
}
Loading