Skip to content

Commit

Permalink
Covered LogicalPlan FilesystemProcessor with tests (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Nov 2, 2023
1 parent 77b2d9e commit 261015b
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Row;

final class AvroExtractor implements Extractor
final class AvroExtractor implements Extractor, Extractor\FileExtractor
{
public function __construct(
private readonly Path $path,
Expand Down Expand Up @@ -60,4 +60,9 @@ public function extract(FlowContext $context) : \Generator
yield array_to_rows($rows, $context->entryFactory());
}
}

public function source() : Path
{
return $this->path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Row;

final class CSVExtractor implements Extractor
final class CSVExtractor implements Extractor, Extractor\FileExtractor
{
/**
* @param int<0, max> $charactersReadInLine
Expand Down Expand Up @@ -108,4 +108,9 @@ public function extract(FlowContext $context) : \Generator
}
}
}

public function source() : Path
{
return $this->uri;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

final class JsonExtractor implements Extractor
final class JsonExtractor implements Extractor, Extractor\FileExtractor
{
public function __construct(
private readonly Path $path,
Expand Down Expand Up @@ -53,6 +53,11 @@ public function extract(FlowContext $context) : \Generator
}
}

public function source() : Path
{
return $this->path;
}

/**
* @return array{pointer?: string, decoder: ExtJsonDecoder}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public function test_json_loader_with_a_thread_safe_and_append_mode() : void
$loader = new JsonLoader(Path::realpath($stream));

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$stream}\" in non thread safe mode is not supported.");
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$stream}\" is not supported.");

(new Flow())
->process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Flow\Parquet\ParquetFile;
use Flow\Parquet\Reader;

final class ParquetExtractor implements Extractor
final class ParquetExtractor implements Extractor, Extractor\FileExtractor
{
/**
* @param Path $path
Expand Down Expand Up @@ -53,6 +53,11 @@ public function extract(FlowContext $context) : \Generator
}
}

public function source() : Path
{
return $this->path;
}

/**
* @return \Generator<int, array{file: ParquetFile, uri: string}>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Row;

final class TextExtractor implements Extractor
final class TextExtractor implements Extractor, Extractor\FileExtractor
{
public function __construct(
private readonly Path $path,
Expand Down Expand Up @@ -57,4 +57,9 @@ public function extract(FlowContext $context) : \Generator
}
}
}

public function source() : Path
{
return $this->path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Flow\ETL\Row;
use Flow\ETL\Rows;

final class XMLReaderExtractor implements Extractor
final class XMLReaderExtractor implements Extractor, Extractor\FileExtractor
{
/**
* In order to iterate only over <element> nodes us root/elements/element.
Expand Down Expand Up @@ -96,4 +96,9 @@ public function extract(FlowContext $context) : \Generator
}
}
}

public function source() : Path
{
return $this->path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public function process(LogicalPlan $plan, FlowContext $context) : LogicalPlan
}

if ($context->streams()->fs()->fileExists($loader->destination())) {
throw new RuntimeException('Appending to existing single file destination "' . $loader->destination()->uri() . '" in non thread safe mode is not supported.');
throw new RuntimeException('Appending to existing single file destination "' . $loader->destination()->uri() . '" is not supported.');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Pipeline\Execution\Processor;

use Flow\ETL\Config;
use Flow\ETL\DSL\CSV;
use Flow\ETL\DSL\From;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;
use Flow\ETL\FlowContext;
use Flow\ETL\Pipeline\Execution\LogicalPlan;
use Flow\ETL\Pipeline\Execution\Processor\FilesystemProcessor;
use Flow\ETL\Pipeline\Pipes;
use PHPUnit\Framework\TestCase;

final class FilesystemProcessorTest extends TestCase
{
public function test_append_mode_to_a_single_file() : void
{
$path = \sys_get_temp_dir() . '/flow-etl-filesystem-processor-test-overwrite-mode.csv';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow())
->read(From::array([['id' => 1], ['id' => 2]]))
->write(CSV::to($path))
->run();

$processor = new FilesystemProcessor();
$extractor = CSV::from($path);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$path}\" is not supported.");

$processor->process(
new LogicalPlan(
$extractor,
new Pipes([CSV::to($path)])
),
(new FlowContext(Config::default()))
->setThreadSafe()
->setMode(SaveMode::Append)
);
}

public function test_append_mode_without_thread_safe() : void
{
$path = \sys_get_temp_dir() . '/flow-etl-filesystem-processor-test-overwrite-mode.csv';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow())
->read(From::array([['id' => 1], ['id' => 2]]))
->write(CSV::to($path))
->run();

$processor = new FilesystemProcessor();
$extractor = CSV::from($path);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to destination \"file:/{$path}\" in non thread safe mode is not supported");

$processor->process(
new LogicalPlan(
$extractor,
new Pipes([CSV::to($path)])
),
(new FlowContext(Config::default()))->setMode(SaveMode::Append)
);
}

public function test_exception_if_exists_mode() : void
{
$path = \sys_get_temp_dir() . '/flow-etl-filesystem-processor-test-overwrite-mode.csv';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow())
->read(From::array([['id' => 1], ['id' => 2]]))
->write(CSV::to($path))
->run();

$processor = new FilesystemProcessor();
$extractor = CSV::from($path);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Destination path \"file:/{$path}\" already exists, please change path to different or set different SaveMode");

$processor->process(
new LogicalPlan(
$extractor,
new Pipes([CSV::to($path)])
),
(new FlowContext(Config::default()))->setMode(SaveMode::ExceptionIfExists)
);
}

public function test_ignore_mode() : void
{
$path = \sys_get_temp_dir() . '/flow-etl-filesystem-processor-test-overwrite-mode.csv';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow())
->read(From::array([['id' => 1], ['id' => 2]]))
->write(CSV::to($path))
->run();

$processor = new FilesystemProcessor();
$extractor = CSV::from($path);

$plan = $processor->process(
new LogicalPlan(
$extractor,
new Pipes([CSV::to($path)])
),
(new FlowContext(Config::default()))->setMode(SaveMode::Ignore)
);

$this->assertFileExists($path);
$this->assertCount(0, $plan->pipes->all());
}

public function test_overwrite_mode() : void
{
$path = \sys_get_temp_dir() . '/flow-etl-filesystem-processor-test-overwrite-mode.csv';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow())
->read(From::array([['id' => 1], ['id' => 2]]))
->write(CSV::to($path))
->run();

$processor = new FilesystemProcessor();
$extractor = CSV::from($path);

$processor->process(
new LogicalPlan(
$extractor,
new Pipes([CSV::to($path)])
),
(new FlowContext(Config::default()))->setMode(SaveMode::Overwrite)
);

$this->assertFileDoesNotExist($path);
}

public function test_throwing_exception_when_extractor_source_does_not_exists() : void
{
$processor = new FilesystemProcessor();
$extractor = CSV::from('/not_existing_file.csv');

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Not existing path used to extract data: file://not_existing_file.csv');

$processor->process(
new LogicalPlan(
$extractor,
Pipes::empty()
),
(new FlowContext(Config::default()))
);
}
}

0 comments on commit 261015b

Please sign in to comment.