Skip to content

Commit

Permalink
Move handling of SaveMode to FilesystemStreams (#810)
Browse files Browse the repository at this point in the history
* Removed PartitionedRows in favor of keeping partitions in Rows

* Move SaveMode to FilesystemStreams

* Added missing tests

* Fixed failing tests

* Improved FilesystemStreams tests
  • Loading branch information
norberttech authored Nov 21, 2023
1 parent c316406 commit 29c3e0f
Show file tree
Hide file tree
Showing 37 changed files with 636 additions and 920 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$partitions = $filePath->partitions();

$reader = new \AvroDataIOReader(
new AvroResource(
$context->streams()->fs()->open(
Expand All @@ -47,7 +49,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory());
$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
Expand Down Expand Up @@ -156,7 +155,6 @@ private function writer(FlowContext $context) : \AvroDataIOWriter
$context->streams()->open(
$this->path,
'avro',
Mode::WRITE_BINARY,
$context->appendSafe()
)->resource()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function closure(FlowContext $context) : void
$context->streams()->rm($this->output);
}

$stream = $context->streams()->open($this->output, 'html', Mode::WRITE, false);
$stream = $context->streams()->open($this->output, 'html', false);

$templateStream = $context->streams()->fs()->open($this->template, Mode::READ);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $path) {
$partitions = $path->partitions();
$stream = $context->streams()->fs()->open($path, Mode::READ);

$headers = [];
Expand Down Expand Up @@ -89,7 +90,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory());
$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\FileStream;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
Expand Down Expand Up @@ -81,8 +80,8 @@ public function load(Rows $rows, FlowContext $context) : void
$headers = $rows->first()->entries()->map(fn (Entry $entry) => $entry->name());

if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partition) {
$this->write($partition->rows, $headers, $context, $partition->partitions);
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
$this->write($partitionedRows, $headers, $context, $partitionedRows->partitions());
}
} else {
$this->write($rows, $headers, $context, []);
Expand All @@ -97,14 +96,14 @@ public function write(Rows $nextRows, array $headers, FlowContext $context, arra
if ($this->header && !$context->streams()->isOpen($this->path, $partitions)) {
$this->writeCSV(
$headers,
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->appendSafe(), $partitions)
$context->streams()->open($this->path, 'csv', $context->appendSafe(), $partitions)
);
}

foreach ($nextRows as $row) {
$this->writeCSV(
$row->toArray(),
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->appendSafe(), $partitions)
$context->streams()->open($this->path, 'csv', $context->appendSafe(), $partitions)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$partitions = $filePath->partitions();

/**
* @var array|object $rowData
*/
Expand All @@ -42,7 +44,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $filePath->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory());
$signal = yield array_to_rows($row, $context->entryFactory(), $partitions);
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\FileStream;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
Expand Down Expand Up @@ -63,7 +62,7 @@ public function load(Rows $rows, FlowContext $context) : void
{
if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
$this->write($partitionedRows->rows, $partitionedRows->partitions, $context);
$this->write($partitionedRows, $partitionedRows->partitions(), $context);
}
} else {
$this->write($rows, [], $context);
Expand All @@ -75,15 +74,14 @@ public function load(Rows $rows, FlowContext $context) : void
*/
public function write(Rows $nextRows, array $partitions, FlowContext $context) : void
{
$mode = Mode::WRITE;
$streams = $context->streams();

if (!$streams->isOpen($this->path, $partitions)) {
$stream = $streams->open($this->path, 'json', $mode, $context->appendSafe(), $partitions);
$stream = $streams->open($this->path, 'json', $context->appendSafe(), $partitions);

$this->init($stream);
} else {
$stream = $streams->open($this->path, 'json', $mode, $context->appendSafe(), $partitions);
$stream = $streams->open($this->path, 'json', $context->appendSafe(), $partitions);
}

$this->writeJSON($nextRows, $stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

namespace Flow\ETL\Adapter\JSON\Tests\Integration;

use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\JSON\JsonLoader;
use Flow\ETL\Config;
use Flow\ETL\DSL\From;
use Flow\ETL\DSL\Json;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;
Expand Down Expand Up @@ -113,7 +114,7 @@ public function test_json_loader_with_a_thread_safe_and_overwrite() : void
\range(6, 10)
)
),
$context = ($context)->setMode(SaveMode::Overwrite)->setAppendSafe()
$context = $context->setAppendSafe()
);

$loader->closure($context);
Expand Down Expand Up @@ -144,35 +145,171 @@ public function test_json_loader_with_a_thread_safe_and_overwrite() : void
}
}

public function test_json_loader_with_append_mode() : void
public function test_save_mode_ignore_on_partitioned_rows() : void
{
$stream = \rtrim(\sys_get_temp_dir(), '/') . '/' . \uniqid('flow_php_etl_json_loader', true) . '.json';
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_json_loader_ignore_mode', true);

\file_put_contents($stream, '[]');
if (\file_exists($path)) {
\unlink($path);
}

$loader = new JsonLoader(Path::realpath($stream));
(new Flow)
->read(From::array([
['id' => 1, 'partition' => 'a'],
['id' => 2, 'partition' => 'a'],
['id' => 3, 'partition' => 'a'],
['id' => 4, 'partition' => 'b'],
['id' => 5, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::Overwrite)
->write(Json::to($path))
->run();

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$stream}\" is not supported.");
(new Flow)
->read(From::array([
['id' => 8, 'partition' => 'b'],
['id' => 10, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::Ignore)
->write(Json::to($path))
->run();

(new Flow())
->process(
new Rows(
...\array_map(
fn (int $i) : Row => Row::create(
new Row\Entry\IntegerEntry('id', $i),
new Row\Entry\StringEntry('name', 'name_' . $i)
),
\range(0, 5)
)
)
)
->mode(SaveMode::Append)
->load($loader)
$this->assertSame(
[
['id' => 1, 'partition' => 'a'],
['id' => 2, 'partition' => 'a'],
['id' => 3, 'partition' => 'a'],
['id' => 4, 'partition' => 'b'],
['id' => 5, 'partition' => 'b'],
],
(new Flow())
->read(Json::from($path))
->fetch()
->toArray()
);
}

public function test_save_mode_overwrite_on_partitioned_rows() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_json_loader_ignore_mode', true);

if (\file_exists($path)) {
\unlink($path);
}

(new Flow)
->read(From::array([
['id' => 1, 'partition' => 'a'],
['id' => 2, 'partition' => 'a'],
['id' => 3, 'partition' => 'a'],
['id' => 4, 'partition' => 'b'],
['id' => 5, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::Overwrite)
->write(Json::to($path))
->run();

if (\file_exists($stream)) {
\unlink($stream);
(new Flow)
->read(From::array([
['id' => 8, 'partition' => 'b'],
['id' => 10, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::Overwrite)
->write(Json::to($path))
->run();

$this->assertSame(
[
['id' => 1, 'partition' => 'a'],
['id' => 2, 'partition' => 'a'],
['id' => 3, 'partition' => 'a'],
['id' => 8, 'partition' => 'b'],
['id' => 10, 'partition' => 'b'],
],
(new Flow())
->read(Json::from($path))
->fetch()
->toArray()
);
}

public function test_save_mode_throw_exception_on_partitioned_rows() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_json_loader_exception_mode', true);

if (\file_exists($path)) {
\unlink($path);
}

(new Flow)
->read(From::array([
['id' => 1, 'partition' => 'a'],
['id' => 2, 'partition' => 'a'],
['id' => 3, 'partition' => 'a'],
['id' => 4, 'partition' => 'b'],
['id' => 5, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::ExceptionIfExists)
->write(Json::to($path))
->run();

$this->expectExceptionMessage('Destination path "file:/' . $path . '/partition=b" already exists, please change path to different or set different SaveMode');

(new Flow)
->read(From::array([
['id' => 8, 'partition' => 'b'],
['id' => 10, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::ExceptionIfExists)
->write(Json::to($path))
->run();

}

public function test_save_with_ignore_mode() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_json_loader_ignore_mode', true) . '.json';

if (\file_exists($path)) {
\unlink($path);
}

(new Flow)
->read(From::array([
['id' => 1],
['id' => 2],
['id' => 3],
]))
->mode(SaveMode::Ignore)
->write(Json::to($path))
->run();

(new Flow)
->read(From::array([
['id' => 4],
['id' => 5],
['id' => 6],
]))
->mode(SaveMode::Ignore)
->write(Json::to($path))
->run();

$this->assertSame(
[
['id' => 1],
['id' => 2],
['id' => 3],
],
(new Flow)
->read(Json::from($path))
->fetch()
->toArray()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Partition;
use Flow\Parquet\ByteOrder;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile;
Expand Down Expand Up @@ -43,7 +44,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $fileData['uri'];
}

$signal = yield array_to_rows($row, $context->entryFactory());
$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions']);

$this->countRow();

Expand All @@ -64,7 +65,7 @@ public function source() : Path
}

/**
* @return \Generator<int, array{file: ParquetFile, uri: string}>
* @return \Generator<int, array{file: ParquetFile, uri: string, partitions: array<Partition>}>
*/
private function readers(FlowContext $context) : \Generator
{
Expand All @@ -76,6 +77,7 @@ private function readers(FlowContext $context) : \Generator
))
->readStream($context->streams()->fs()->open($filePath, Mode::READ)->resource()),
'uri' => $filePath->uri(),
'partitions' => $filePath->partitions(),
];
}
}
Expand Down
Loading

0 comments on commit 29c3e0f

Please sign in to comment.