Skip to content

Commit

Permalink
Renamed threadSafe into appendSafe (#732)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Nov 5, 2023
1 parent 88420d9 commit f8279e6
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 136 deletions.
6 changes: 6 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ Affected extractors:
Argument `$rows_in_batch` was renamed to `$rows_per_page` which no longer determines the size of the batch, but the size of the page that will be fetched from Google API.
Rows are yielded one by one.

### 8) DataFrame::threadSafe() method was replaced by DataFrame::appendSafe()

`DataFrame::appendSafe()` is doing exactly the same thing as the old method, it's just more
descriptive and self-explanatory.
It's no longer mandatory to set this flat to true when using SaveMode::APPEND, it's now set automatically.

---

## Upgrading from 0.3.x to 0.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private function writer(FlowContext $context) : \AvroDataIOWriter
$this->path,
'avro',
Mode::WRITE_BINARY,
$context->threadSafe()
$context->appendSafe()
)->resource()
),
new \AvroIODatumWriter($schema),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public function test_safe_writing_and_reading_avro_with_all_supported_types() :
}, \range(1, 100))
)
))
->threadSafe()
->appendSafe()
->write(Avro::to($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ public function write(Rows $nextRows, array $headers, FlowContext $context, arra
if ($this->header && !$context->streams()->exists($this->path, $partitions)) {
$this->writeCSV(
$headers,
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->threadSafe(), $partitions)
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->appendSafe(), $partitions)
);
}

foreach ($nextRows as $row) {
$this->writeCSV(
$row->toArray(),
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->threadSafe(), $partitions)
$context->streams()->open($this->path, 'csv', Mode::WRITE, $context->appendSafe(), $partitions)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,6 @@

final class CSVLoaderTest extends TestCase
{
public function test_appending_to_csv_in_non_thread_safe() : void
{
$path = \rtrim(\sys_get_temp_dir(), '/') . '/' . \uniqid('flow_php_etl_csv_loader', true) . '.csv';
$this->expectExceptionMessage("Appending to destination \"file:/{$path}\" in non thread safe mode is not supported.");

(new Flow())
->process(
new Rows(
Row::create(Entry::integer('id', 1), Entry::string('name', 'Norbert')),
Row::create(Entry::integer('id', 2), Entry::string('name', 'Tomek')),
Row::create(Entry::integer('id', 3), Entry::string('name', 'Dawid')),
)
)
->load(CSV::to($path))
->run();

(new Flow())
->process(
new Rows(
Row::create(Entry::integer('id', 1), Entry::string('name', 'Norbert')),
Row::create(Entry::integer('id', 2), Entry::string('name', 'Tomek')),
Row::create(Entry::integer('id', 3), Entry::string('name', 'Dawid')),
)
)
->mode(SaveMode::Append)
->load(CSV::to($path))
->run();

if (\file_exists($path)) {
\unlink($path);
}
}

public function test_loading_array_entry() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_csv_loader', true) . '.csv';
Expand All @@ -69,7 +36,7 @@ public function test_loading_array_entry() : void
}
}

public function test_loading_csv_files_with_threadsafe() : void
public function test_loading_csv_files_with_append_safe() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_csv_loader', true) . '.csv';

Expand All @@ -81,7 +48,7 @@ public function test_loading_csv_files_with_threadsafe() : void
Row::create(new Row\Entry\IntegerEntry('id', 3), new Row\Entry\StringEntry('name', 'Dawid')),
)
)
->threadSafe()
->appendSafe()
->load(CSV::to($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) :
$streams = $context->streams();

if (!$streams->isOpen($this->path, $partitions)) {
$stream = $streams->open($this->path, 'json', $mode, $context->threadSafe(), $partitions);
$stream = $streams->open($this->path, 'json', $mode, $context->appendSafe(), $partitions);

$this->init($stream);
} else {
$stream = $streams->open($this->path, 'json', $mode, $context->threadSafe(), $partitions);
$stream = $streams->open($this->path, 'json', $mode, $context->appendSafe(), $partitions);
}

$this->writeJSON($nextRows, $stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,6 @@ public function test_json_loader_loading_empty_string() : void
}
}

public function test_json_loader_with_a_thread_safe_and_append_mode() : void
{
$stream = \rtrim(\sys_get_temp_dir(), '/') . '/' . \uniqid('flow_php_etl_json_loader', true) . '.json';

\file_put_contents($stream, '[]');

$loader = new JsonLoader(Path::realpath($stream));

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$stream}\" is not supported.");

(new Flow())
->process(
new Rows(
...\array_map(
fn (int $i) : Row => Row::create(
new Row\Entry\IntegerEntry('id', $i),
new Row\Entry\StringEntry('name', 'name_' . $i)
),
\range(0, 5)
)
)
)
->mode(SaveMode::Append)
->threadSafe()
->load($loader)
->run();

if (\file_exists($stream)) {
\unlink($stream);
}
}

public function test_json_loader_with_a_thread_safe_and_overwrite() : void
{
$stream = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_json_loader', true) . '.json';
Expand All @@ -133,7 +100,7 @@ public function test_json_loader_with_a_thread_safe_and_overwrite() : void
\range(0, 5)
)
),
($context = new FlowContext(Config::default()))->setThreadSafe()
($context = new FlowContext(Config::default()))->setAppendSafe()
);

$loader->load(
Expand All @@ -146,7 +113,7 @@ public function test_json_loader_with_a_thread_safe_and_overwrite() : void
\range(6, 10)
)
),
$context = ($context)->setMode(SaveMode::Overwrite)->setThreadSafe()
$context = ($context)->setMode(SaveMode::Overwrite)->setAppendSafe()
);

$loader->closure($context);
Expand Down Expand Up @@ -176,4 +143,36 @@ public function test_json_loader_with_a_thread_safe_and_overwrite() : void
\unlink($stream . DIRECTORY_SEPARATOR . $files[0]);
}
}

public function test_json_loader_with_append_mode() : void
{
$stream = \rtrim(\sys_get_temp_dir(), '/') . '/' . \uniqid('flow_php_etl_json_loader', true) . '.json';

\file_put_contents($stream, '[]');

$loader = new JsonLoader(Path::realpath($stream));

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Appending to existing single file destination \"file:/{$stream}\" is not supported.");

(new Flow())
->process(
new Rows(
...\array_map(
fn (int $i) : Row => Row::create(
new Row\Entry\IntegerEntry('id', $i),
new Row\Entry\StringEntry('name', 'name_' . $i)
),
\range(0, 5)
)
)
)
->mode(SaveMode::Append)
->load($loader)
->run();

if (\file_exists($stream)) {
\unlink($stream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public function load(Rows $rows, FlowContext $context) : void
if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitions) {

$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe(), $partitions->partitions);
$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->appendSafe(), $partitions->partitions);

if (!\array_key_exists($stream->path()->uri(), $this->writers)) {
$this->writers[$stream->path()->uri()] = new Writer(
Expand All @@ -98,7 +98,7 @@ public function load(Rows $rows, FlowContext $context) : void
}
}
} else {
$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe());
$stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->appendSafe());

if (!\array_key_exists($stream->path()->uri(), $this->writers)) {
$this->writers[$stream->path()->uri()] = new Writer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function load(Rows $rows, FlowContext $context) : void
}

\fwrite(
$context->streams()->open($this->path, 'text', Mode::WRITE, $context->threadSafe(), $partition->partitions)->resource(),
$context->streams()->open($this->path, 'text', Mode::WRITE, $context->appendSafe(), $partition->partitions)->resource(),
$row->entries()->all()[0]->toString() . $this->newLineSeparator
);
}
Expand All @@ -75,7 +75,7 @@ public function load(Rows $rows, FlowContext $context) : void
}

\fwrite(
$context->streams()->open($this->path, 'text', Mode::WRITE, $context->threadSafe(), [])->resource(),
$context->streams()->open($this->path, 'text', Mode::WRITE, $context->appendSafe(), [])->resource(),
$row->entries()->all()[0]->toString() . $this->newLineSeparator
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

final class TextLoaderTest extends TestCase
{
public function test_loading_text_files_with_thread_safe() : void
public function test_loading_text_files_with_append_safe() : void
{
$path = \sys_get_temp_dir() . '/' . \uniqid('flow_php_etl_csv_loader', true) . '.csv';

Expand All @@ -27,7 +27,7 @@ public function test_loading_text_files_with_thread_safe() : void
)
)
->load(Text::to($path))
->threadSafe()
->appendSafe()
->run();

$files = \array_values(\array_diff(\scandir($path), ['..', '.']));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public function process(Rows $rows) : Rows
* each worker would remove all files created by other workers.
*/
->mode(SaveMode::Append)
->threadSafe();
->appendSafe();

if ([] !== $this->partitionEntries) {
$dataFrame = $dataFrame->partitionBy(...$this->partitionEntries);
Expand Down
28 changes: 21 additions & 7 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ public function aggregate(Aggregation ...$aggregations) : self
return $this;
}

/**
* @lazy
* When set to true, files are never written under the origin name but instead initial path is turned
* into a folder in which each process writes to a new file.
* This is also mandatory for SaveMode::Append
*/
public function appendSafe(bool $appendSafe = true) : self
{
$this->context->setAppendSafe($appendSafe);

return $this;
}

/**
* Merge/Split Rows yielded by Extractor into batches of given size.
* For example, when Extractor is yielding one row at time, this method will merge them into batches of given size
Expand Down Expand Up @@ -439,6 +452,10 @@ public function mode(SaveMode $mode) : self
{
$this->context->setMode($mode);

if ($mode === SaveMode::Append) {
$this->appendSafe();
}

return $this;
}

Expand Down Expand Up @@ -653,16 +670,13 @@ public function sortBy(EntryReference ...$entries) : self
}

/**
* @deprecated please use DataFrame::appendSafe() instead
*
* @lazy
* When set to true, files are never written under the origin name but instead initial path is turned
* into a folder in which each process writes to a new file.
* Otherwise parallel processing would not be possible due to a single file bottleneck.
* In a single process pipelines there is not much added value from this setting unless
* there is a chance that the same pipeline execution might overlap.
*/
public function threadSafe(bool $threadSafe = true) : self
public function threadSafe(bool $appendSafe = true) : self
{
$this->context->setThreadSafe($threadSafe);
$this->appendSafe($appendSafe);

return $this;
}
Expand Down
26 changes: 13 additions & 13 deletions src/core/etl/src/Flow/ETL/FlowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/
final class FlowContext
{
private bool $appendSafe = false;

private ErrorHandler $errorHandler;

private SaveMode $mode = SaveMode::ExceptionIfExists;
Expand All @@ -28,15 +30,18 @@ final class FlowContext

private References $partitions;

private bool $threadSafe = false;

public function __construct(public readonly Config $config)
{
$this->partitionFilter = new NoopFilter();
$this->errorHandler = new ThrowError();
$this->partitions = new References();
}

public function appendSafe() : bool
{
return $this->appendSafe;
}

public function cache() : Cache
{
return $this->config->cache();
Expand Down Expand Up @@ -86,23 +91,23 @@ public function serializer() : Serializer
return $this->config->serializer();
}

public function setErrorHandler(ErrorHandler $handler) : self
public function setAppendSafe(bool $appendSafe = true) : self
{
$this->errorHandler = $handler;
$this->appendSafe = $appendSafe;

return $this;
}

public function setMode(SaveMode $mode) : self
public function setErrorHandler(ErrorHandler $handler) : self
{
$this->mode = $mode;
$this->errorHandler = $handler;

return $this;
}

public function setThreadSafe(bool $threadSafe = true) : self
public function setMode(SaveMode $mode) : self
{
$this->threadSafe = $threadSafe;
$this->mode = $mode;

return $this;
}
Expand All @@ -111,9 +116,4 @@ public function streams() : FilesystemStreams
{
return $this->config->filesystemStreams();
}

public function threadSafe() : bool
{
return $this->threadSafe;
}
}
Loading

0 comments on commit f8279e6

Please sign in to comment.