Skip to content

Commit

Permalink
Pipeline Optimization - LimitOptimizer (#730)
Browse files Browse the repository at this point in the history
Added Pipeline Optimizer with LimitOptimization
  • Loading branch information
norberttech authored Nov 5, 2023
1 parent 7fce95f commit 7e25a3e
Show file tree
Hide file tree
Showing 39 changed files with 500 additions and 293 deletions.
7 changes: 7 additions & 0 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Pipeline\Execution\Processors;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\EntryFactory;
use Flow\Serializer\Serializer;

Expand All @@ -26,6 +27,7 @@ public function __construct(
private readonly Serializer $serializer,
private readonly FilesystemStreams $filesystemStreams,
private readonly Processors $processors,
private readonly Optimizer $optimizer,
private readonly bool $putInputIntoRows,
private readonly EntryFactory $entryFactory
) {
Expand Down Expand Up @@ -66,6 +68,11 @@ public function id() : string
return $this->id;
}

public function optimizer() : Optimizer
{
return $this->optimizer;
}

public function processors() : Processors
{
return $this->processors;
Expand Down
4 changes: 4 additions & 0 deletions src/core/etl/src/Flow/ETL/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\Pipeline\Execution\Processor\FilesystemProcessor;
use Flow\ETL\Pipeline\Execution\Processors;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\Factory\NativeEntryFactory;
use Flow\Serializer\CompressingSerializer;
use Flow\Serializer\Serializer;
Expand Down Expand Up @@ -68,6 +69,9 @@ public function build() : Config
new Processors(
new FilesystemProcessor()
),
new Optimizer(
new Optimizer\LimitOptimization()
),
$this->putInputIntoRows,
new NativeEntryFactory()
);
Expand Down
4 changes: 2 additions & 2 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public function joinEach(DataFrameFactory $factory, Expression $on, string|Join
*/
public function limit(int $limit) : self
{
$this->pipeline->add(new LimitTransformer($limit));
$this->pipeline = $this->context->config->optimizer()->optimize(new LimitTransformer($limit), $this->pipeline);

return $this;
}
Expand Down Expand Up @@ -647,7 +647,7 @@ public function sortBy(EntryReference ...$entries) : self
->run();

$this->pipeline = $this->pipeline->cleanCopy();
$this->pipeline->source($this->context->config->externalSort()->sortBy(...$entries));
$this->pipeline->setSource($this->context->config->externalSort()->sortBy(...$entries));

return $this;
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/Limitable.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public function countRow() : void
$this->yieldedRows++;
}

public function isLimited() : bool
{
return $this->limit !== null;
}

public function limit() : ?int
{
return $this->limit;
Expand Down
2 changes: 2 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/LimitableExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
interface LimitableExtractor
{
public function changeLimit(int $limit) : void;

public function isLimited() : bool;
}
4 changes: 2 additions & 2 deletions src/core/etl/src/Flow/ETL/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public static function setUp(ConfigBuilder|Config $config) : self
public function extract(Extractor $extractor) : DataFrame
{
return new DataFrame(
(new SynchronousPipeline())->source($extractor),
(new SynchronousPipeline())->setSource($extractor),
$this->config
);
}

public function process(Rows ...$rows) : DataFrame
{
return new DataFrame(
(new SynchronousPipeline())->source(new ProcessExtractor(...$rows)),
(new SynchronousPipeline())->setSource(new ProcessExtractor(...$rows)),
$this->config
);
}
Expand Down
8 changes: 7 additions & 1 deletion src/core/etl/src/Flow/ETL/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Flow\ETL;

use Flow\ETL\Pipeline\Pipes;

/**
* @internal
*/
Expand All @@ -22,10 +24,14 @@ public function has(string $transformerClass) : bool;

public function isAsync() : bool;

public function pipes() : Pipes;

/**
* @return \Generator<Rows>
*/
public function process(FlowContext $context) : \Generator;

public function source(Extractor $extractor) : self;
public function setSource(Extractor $extractor) : self;

public function source() : Extractor;
}
16 changes: 13 additions & 3 deletions src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ public function isAsync() : bool
return $this->pipeline->isAsync();
}

public function pipes() : Pipes
{
return $this->pipeline->pipes()->merge($this->nextPipeline->pipes());
}

public function process(FlowContext $context) : \Generator
{
$this->nextPipeline->source(
$this->nextPipeline->setSource(
From::chunks_from(
From::pipeline($this->pipeline),
$this->size
Expand All @@ -73,10 +78,15 @@ public function process(FlowContext $context) : \Generator
return $this->nextPipeline->process($context);
}

public function source(Extractor $extractor) : self
public function setSource(Extractor $extractor) : self
{
$this->pipeline->source($extractor);
$this->pipeline->setSource($extractor);

return $this;
}

public function source() : Extractor
{
return $this->pipeline->source();
}
}
14 changes: 12 additions & 2 deletions src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public function isAsync() : bool
return false;
}

public function pipes() : Pipes
{
return $this->pipeline->pipes();
}

public function process(FlowContext $context) : \Generator
{
$context->config->cache()->clear($id = $this->id ?: $context->config->id());
Expand All @@ -53,10 +58,15 @@ public function process(FlowContext $context) : \Generator
}
}

public function source(Extractor $extractor) : Pipeline
public function setSource(Extractor $extractor) : Pipeline
{
$this->pipeline->source($extractor);
$this->pipeline->setSource($extractor);

return $this;
}

public function source() : Extractor
{
return $this->pipeline->source();
}
}
16 changes: 13 additions & 3 deletions src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,29 @@ public function isAsync() : bool
return $this->pipeline->isAsync();
}

public function pipes() : Pipes
{
return $this->pipeline->pipes()->merge($this->nextPipeline->pipes());
}

public function process(FlowContext $context) : \Generator
{
$this->nextPipeline->source(From::rows(
$this->nextPipeline->setSource(From::rows(
(new Rows())->merge(...\iterator_to_array($this->pipeline->process($context)))
));

return $this->nextPipeline->process($context);
}

public function source(Extractor $extractor) : self
public function setSource(Extractor $extractor) : self
{
$this->pipeline->source($extractor);
$this->pipeline->setSource($extractor);

return $this;
}

public function source() : Extractor
{
return $this->pipeline->source();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Flow\ETL\Pipeline\Execution\Plan\FilesystemOperations;
use Flow\ETL\Pipeline\Pipes;

final class LogicalPlan
final class ExecutionPlan
{
public function __construct(public readonly Extractor $extractor, public readonly Pipes $pipes)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader\FileLoader;
use Flow\ETL\Pipeline\Execution\LogicalPlan;
use Flow\ETL\Pipeline\Execution\ExecutionPlan;
use Flow\ETL\Pipeline\Pipes;

final class FilesystemProcessor implements Processor
{
public function process(LogicalPlan $plan, FlowContext $context) : LogicalPlan
public function process(ExecutionPlan $plan, FlowContext $context) : ExecutionPlan
{
$operations = $plan->filesystemOperations();

Expand Down Expand Up @@ -76,6 +76,6 @@ public function process(LogicalPlan $plan, FlowContext $context) : LogicalPlan
}
}

return new LogicalPlan($plan->extractor, $newPipes);
return new ExecutionPlan($plan->extractor, $newPipes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace Flow\ETL\Pipeline\Execution\Processor;

use Flow\ETL\FlowContext;
use Flow\ETL\Pipeline\Execution\LogicalPlan;
use Flow\ETL\Pipeline\Execution\ExecutionPlan;

interface Processor
{
public function process(LogicalPlan $plan, FlowContext $context) : LogicalPlan;
public function process(ExecutionPlan $plan, FlowContext $context) : ExecutionPlan;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __construct(Processor ...$processors)
$this->processors = $processors;
}

public function process(LogicalPlan $plan, FlowContext $context) : LogicalPlan
public function process(ExecutionPlan $plan, FlowContext $context) : ExecutionPlan
{
$processedPlan = $plan;

Expand Down
16 changes: 13 additions & 3 deletions src/core/etl/src/Flow/ETL/Pipeline/GroupByPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,33 @@ public function isAsync() : bool
return $this->pipeline->isAsync();
}

public function pipes() : Pipes
{
return $this->pipeline->pipes()->merge($this->nextPipeline->pipes());
}

public function process(FlowContext $context) : \Generator
{
foreach ($this->pipeline->process($context) as $nextRows) {
$this->groupBy->group($nextRows);
}

$this->nextPipeline->source(new Extractor\ProcessExtractor($this->groupBy->result($context)));
$this->nextPipeline->setSource(new Extractor\ProcessExtractor($this->groupBy->result($context)));

foreach ($this->nextPipeline->process($context) as $nextRows) {
yield $nextRows;
}
}

public function source(Extractor $extractor) : self
public function setSource(Extractor $extractor) : self
{
$this->pipeline->source($extractor);
$this->pipeline->setSource($extractor);

return $this;
}

public function source() : Extractor
{
return $this->pipeline->source();
}
}
82 changes: 0 additions & 82 deletions src/core/etl/src/Flow/ETL/Pipeline/Limiter.php

This file was deleted.

Loading

0 comments on commit 7e25a3e

Please sign in to comment.