Skip to content

Commit

Permalink
Display partitions (#877)
Browse files Browse the repository at this point in the history
* Display partition when displaying rows

* Small additions to DSL
  • Loading branch information
norberttech authored Dec 13, 2023
1 parent 94778b3 commit f0ef0da
Show file tree
Hide file tree
Showing 25 changed files with 913 additions and 544 deletions.
18 changes: 18 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ Please follow the instructions for your specific version to ensure a smooth upgr

---

## Upgrading from 0.5.x to 0.6.x

### 1) Rows::merge() accepts single instance of Rows

Before:

```php
Rows::merge(Rows ...$rows) : Rows
```

After:

```php
Rows::merge(Rows $rows) : Rows
```

---

## Upgrading from 0.4.x to 0.5.x

### 1) Entry factory moved from extractors to `FlowContext`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use function Flow\ETL\DSL\float_entry;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\from_rows;
use function Flow\ETL\DSL\ignore;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\json_entry;
use function Flow\ETL\DSL\json_object_entry;
Expand Down Expand Up @@ -331,7 +332,7 @@ public function test_writing_twice_to_the_same_location_with_ignore_mode() : voi
}, \range(1, 100))
)
))
->mode(SaveMode::Ignore)
->saveMode(ignore())
->write(to_avro($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public function load(Rows $rows, FlowContext $context) : void

if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
$this->write($partitionedRows, $headers, $context, $partitionedRows->partitions());
$this->write($partitionedRows, $headers, $context, $partitionedRows->partitions()->toArray());
}
} else {
$this->write($rows, $headers, $context, []);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\CSV\CSVExtractor;
Expand Down Expand Up @@ -115,7 +116,7 @@ public function test_extracting_csv_files_with_header() : void
{
$path = __DIR__ . '/../Fixtures/annual-enterprise-survey-2019-financial-year-provisional-csv.csv';

$rows = (new Flow())
$rows = df()
->read(from_csv($path))
->fetch();

Expand Down Expand Up @@ -166,7 +167,7 @@ public function test_extracting_csv_files_without_header() : void

public function test_extracting_csv_with_corrupted_row() : void
{
$rows = (new Flow())
$rows = df()
->extract(from_csv(__DIR__ . '/../Fixtures/corrupted_row.csv'))
->fetch();

Expand Down Expand Up @@ -221,7 +222,7 @@ public function test_extracting_csv_with_more_than_1000_characters_per_line_spli
{
$this->assertCount(
2,
(new Flow())
df()
->read(from_csv(__DIR__ . '/../Fixtures/more_than_1000_characters_per_line.csv'))
->fetch()
->toArray(),
Expand All @@ -233,7 +234,7 @@ public function test_extracting_csv_with_more_than_1000_characters_per_line_with
{
$this->assertCount(
1,
(new Flow())
df()
->read(from_csv(__DIR__ . '/../Fixtures/more_than_1000_characters_per_line.csv', characters_read_in_line: 2000))
->fetch()
->toArray(),
Expand All @@ -249,7 +250,7 @@ public function test_limit() : void
\unlink($path);
}

(new Flow())->read(from_array([['id' => 1], ['id' => 2], ['id' => 3], ['id' => 4], ['id' => 5]]))
df()->read(from_array([['id' => 1], ['id' => 2], ['id' => 3], ['id' => 4], ['id' => 5]]))
->write(to_csv($path))
->run();

Expand All @@ -275,7 +276,7 @@ public function test_loading_data_from_all_partitions() : void
['group' => '2', 'id' => 7, 'value' => 'g'],
['group' => '2', 'id' => 8, 'value' => 'h'],
],
(new Flow())
df()
->read(from_csv(__DIR__ . '/../Fixtures/partitioned/group=*/*.csv'))
->withEntry('id', ref('id')->cast('int'))
->sortBy(ref('id'))
Expand Down Expand Up @@ -314,7 +315,7 @@ public function test_signal_stop() : void
\unlink($path);
}

(new Flow())->read(from_array([['id' => 1], ['id' => 2], ['id' => 3], ['id' => 4], ['id' => 5]]))
df()->read(from_array([['id' => 1], ['id' => 2], ['id' => 3], ['id' => 4], ['id' => 5]]))
->write(to_csv($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\array_entry;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\overwrite;
use function Flow\ETL\DSL\str_entry;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;
use Flow\ETL\Row;
use Flow\ETL\Rows;
Expand Down Expand Up @@ -207,7 +207,7 @@ public function test_loading_overwrite_csv() : void
Row::create(int_entry('id', 3), str_entry('name', 'Dawid')),
)
)
->mode(SaveMode::Overwrite)
->saveMode(overwrite())
->load(to_csv($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public function load(Rows $rows, FlowContext $context) : void
{
if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
$this->write($partitionedRows, $partitionedRows->partitions(), $context);
$this->write($partitionedRows, $partitionedRows->partitions()->toArray(), $context);
}
} else {
$this->write($rows, [], $context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
use function Flow\ETL\Adapter\Json\from_json;
use function Flow\ETL\Adapter\Json\to_json;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\exception_if_exists;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\ignore;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\JSON\JsonLoader;
use Flow\ETL\Config;
Expand Down Expand Up @@ -259,7 +261,7 @@ public function test_save_mode_throw_exception_on_partitioned_rows() : void
['id' => 5, 'partition' => 'b'],
]))
->partitionBy(ref('partition'))
->mode(SaveMode::ExceptionIfExists)
->saveMode(exception_if_exists())
->write(to_json($path))
->run();

Expand Down Expand Up @@ -291,7 +293,7 @@ public function test_save_with_ignore_mode() : void
['id' => 2],
['id' => 3],
]))
->mode(SaveMode::Ignore)
->saveMode(ignore())
->write(to_json($path))
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public function load(Rows $rows, FlowContext $context) : void
if ($context->partitionEntries()->count()) {
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {

$stream = $streams->open($this->path, 'parquet', $context->appendSafe(), $partitionedRows->partitions());
$stream = $streams->open($this->path, 'parquet', $context->appendSafe(), $partitionedRows->partitions()->toArray());

if (!\array_key_exists($stream->path()->uri(), $this->writers)) {
$this->writers[$stream->path()->uri()] = new Writer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public function load(Rows $rows, FlowContext $context) : void
}

\fwrite(
$context->streams()->open($this->path, 'text', $context->appendSafe(), $partitionedRows->partitions())->resource(),
$context->streams()->open($this->path, 'text', $context->appendSafe(), $partitionedRows->partitions()->toArray())->resource(),
$row->entries()->all()[0]->toString() . $this->newLineSeparator
);
}
Expand Down
36 changes: 36 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Flow\ETL\ErrorHandler\SkipRows;
use Flow\ETL\ErrorHandler\ThrowError;
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\Flow;
use Flow\ETL\FlowContext;
Expand Down Expand Up @@ -431,6 +432,21 @@ function rows(Row ...$row) : Rows
return new Rows(...$row);
}

function partition(string $name, string $value) : Partition
{
return new Partition($name, $value);
}

function partitions(Partition ...$partition) : \Flow\ETL\Partitions
{
return new \Flow\ETL\Partitions(...$partition);
}

function rows_partitioned(array $rows, array $partitions) : Rows
{
return Rows::partitioned($rows, new \Flow\ETL\Partitions(...$partitions));
}

function col(string $entry) : EntryReference
{
return new EntryReference($entry);
Expand Down Expand Up @@ -973,3 +989,23 @@ function config_builder() : ConfigBuilder
{
return new ConfigBuilder();
}

function overwrite() : SaveMode
{
return SaveMode::Overwrite;
}

function ignore() : SaveMode
{
return SaveMode::Ignore;
}

function exception_if_exists() : SaveMode
{
return SaveMode::ExceptionIfExists;
}

function append() : SaveMode
{
return SaveMode::Append;
}
44 changes: 29 additions & 15 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,16 @@ public function crossJoin(self $dataFrame, string $prefix = '') : self
#[DSLMethod(exclude: true)]
public function display(int $limit = 20, int|bool $truncate = 20, Formatter $formatter = new AsciiTableFormatter()) : string
{
return $formatter->format($this->fetch($limit), $truncate);
$clone = clone $this;
$clone->limit($limit);

$output = '';

foreach ($clone->pipeline->process($clone->context) as $rows) {
$output .= $formatter->format($rows, $truncate);
}

return $output;
}

/**
Expand Down Expand Up @@ -305,35 +314,30 @@ public function dropDuplicates(string|Reference ...$entries) : self
public function fetch(?int $limit = null) : Rows
{
$clone = clone $this;
$clone->collect();

if ($limit !== null) {
$clone->limit($limit);
}

if ($clone->context->partitionEntries()->count()) {
$rows = (new Rows())->merge(
...\iterator_to_array($clone->pipeline->process($clone->context))
);

$fetchedRows = (new Rows());
$rows = new Rows();

foreach ($rows->partitionBy(...$clone->context->partitionEntries()->all()) as $partitionedRows) {
if ($clone->context->partitionFilter()->keep(...$partitionedRows->partitions())) {
$fetchedRows = $fetchedRows->merge($partitionedRows);
foreach ($clone->pipeline->process($clone->context) as $nextRows) {
if ($clone->context->partitionFilter()->keep(...$nextRows->partitions()->toArray())) {
$rows = $rows->merge($nextRows);
}
}

return $fetchedRows;
return $rows;
}

$rows = \iterator_to_array($clone->pipeline->process($clone->context));
$rows = new Rows();

if (!\count($rows)) {
return new Rows();
foreach ($clone->pipeline->process($clone->context) as $nextRows) {
$rows = $rows->merge($nextRows);
}

return $rows[0];
return $rows;
}

/**
Expand Down Expand Up @@ -740,6 +744,16 @@ public function run(?callable $callback = null) : void
}
}

/**
* Alias for DataFrame::mode.
*
* @lazy
*/
public function saveMode(SaveMode $mode) : self
{
return $this->mode($mode);
}

/**
* @lazy
* Keep only given entries.
Expand Down
9 changes: 9 additions & 0 deletions src/core/etl/src/Flow/ETL/Formatter/ASCII/ASCIIBody.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public function print(int|bool $truncate = 20) : string
$buffer .= '-' . \str_repeat('-', $length) . '-+';
}

if (\count($this->body->partitions())) {
$buffer .= PHP_EOL;
$buffer .= 'Partitions:';

foreach ($this->body->partitions() as $partition) {
$buffer .= PHP_EOL . ' - ' . $partition->name . '=' . $partition->value;
}
}

return $buffer;
}
}
9 changes: 9 additions & 0 deletions src/core/etl/src/Flow/ETL/Formatter/ASCII/Body.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\ETL\Formatter\ASCII;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Partition;
use Flow\ETL\Row;
use Flow\ETL\Rows;

Expand Down Expand Up @@ -32,6 +33,14 @@ public function maximumLength(string $entry, int|bool $truncate = 20) : int
return $max;
}

/**
* @return Partition[]
*/
public function partitions() : array
{
return $this->rows->partitions()->toArray();
}

/**
* @return array<Row>
*/
Expand Down
Loading

0 comments on commit f0ef0da

Please sign in to comment.