Skip to content

Commit

Permalink
Adjust Rows::chunk() to work on generators instead of arrays (#656)
Browse files Browse the repository at this point in the history
* Adjust `Rows::chunk()` to work on generators instead of arrays

* Add benchmark for `Rows::chunk()`

* Add `GeneratorExtractor`

* Use `building_blocks` group name for the benchmark

* Validate content of `GeneratorExtractor`
  • Loading branch information
stloyd committed Oct 29, 2023
1 parent 4fa95a4 commit 6241e8a
Show file tree
Hide file tree
Showing 20 changed files with 127 additions and 57 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ jobs:
echo ' '
echo '</details>'
echo ' '
echo '<details><summary>Entry Factory</summary>'
echo '<details><summary>Building Blocks</summary>'
echo ' '
echo '```shell'
composer test:benchmark -- --ref=1.x --progress=none --group=entry_factory
composer test:benchmark -- --ref=1.x --progress=none --group=building_blocks
echo '```'
echo ' '
echo '</details>'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* @implements Loader<array{
* table_name: string,
* chunk_size: int,
* chunk_size: int<1, max>,
* connection_params: array<string, mixed>,
* operation: string,
* operation_options: array{
Expand All @@ -35,8 +35,7 @@ final class DbalLoader implements Loader
private string $operation;

/**
* @param string $tableName
* @param int $chunkSize
* @param int<1, max> $chunkSize
* @param array<string, mixed> $connectionParams
* @param array{
* skip_conflicts?: boolean,
Expand All @@ -45,7 +44,6 @@ final class DbalLoader implements Loader
* update_columns?: array<string>,
* primary_key_columns?: array<string>
* } $operationOptions
* @param string $operation
*
* @throws InvalidArgumentException
*/
Expand All @@ -66,17 +64,14 @@ public function __construct(
* Since Connection::getParams() is marked as an internal method, please
* use this constructor with caution.
*
* @param Connection $connection
* @param string $tableName
* @param int $chunkSize
* @param int<1, max> $chunkSize
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
* conflict_columns?: array<string>,
* update_columns?: array<string>,
* primary_key_columns?: array<string>
* } $operationOptions
* @param string $operation
*
* @throws InvalidArgumentException
*/
Expand Down
10 changes: 2 additions & 8 deletions src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ final public static function from_limit_offset(
* @param int $page_size
* @param null|int $maximum
*
* @throws InvalidArgumentException
*
* @return Extractor
*/
final public static function from_limit_offset_qb(
Expand Down Expand Up @@ -135,8 +133,7 @@ final public static function from_query(

/**
* @param array<string, mixed>|Connection $connection
* @param string $table
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand All @@ -146,8 +143,6 @@ final public static function from_query(
* } $options
*
* @throws InvalidArgumentException
*
* @return Loader
*/
final public static function to_table_insert(
array|Connection $connection,
Expand All @@ -162,8 +157,7 @@ final public static function to_table_insert(

/**
* @param array<string, mixed>|Connection $connection
* @param string $table
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* },
* chunk_size: int,
* chunk_size: int<1, max>,
* index: string,
* id_factory: IdFactory,
* parameters: array<mixed>,
Expand All @@ -41,6 +41,7 @@ final class ElasticsearchLoader implements Loader

/**
* @param array{hosts?: array<string>, connectionParams?: array<mixed>, retries?: int, sniffOnStart?: boolean, sslCert?: array<string>, sslKey?: array<string>, sslVerification?: (boolean|string), elasticMetaHeader?: boolean, includePortInHostHeader?: boolean} $config
* @param int<1, max> $chunkSize
* @param array<mixed> $parameters
*/
public function __construct(
Expand All @@ -66,6 +67,7 @@ public function __construct(
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $clientConfig
* @param int<1, max> $chunkSize
* @param array<mixed> $parameters
*/
public static function update(array $clientConfig, int $chunkSize, string $index, IdFactory $idFactory, array $parameters = []) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Elasticsearch
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $config
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param string $index
* @param IdFactory $id_factory
* @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html
Expand Down Expand Up @@ -60,7 +60,7 @@ final public static function bulk_index(
* elasticMetaHeader?: boolean,
* includePortInHostHeader?: boolean
* } $config
* @param int $chunk_size
* @param int<1, max> $chunk_size
* @param string $index
* @param IdFactory $id_factory
* @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html
Expand Down
6 changes: 6 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/From.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ final public static function array(array $array, int $batch_size = 100) : Extrac
return new MemoryExtractor(new ArrayMemory($array), $batch_size);
}

/**
* @param int<1, max> $max_row_size
*/
final public static function buffer(Extractor $extractor, int $max_row_size) : Extractor
{
return new Extractor\BufferExtractor($extractor, $max_row_size);
Expand All @@ -50,6 +53,9 @@ final public static function chain(Extractor ...$extractors) : Extractor
return new Extractor\ChainExtractor(...$extractors);
}

/**
* @param int<1, max> $chunk_size
*/
final public static function chunks_from(Extractor $extractor, int $chunk_size) : Extractor
{
return new Extractor\ChunkExtractor($extractor, $chunk_size);
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/To.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
class To
{
/**
* @param int<1, max> $bufferSize
*/
final public static function buffer(Loader $overflowLoader, int $bufferSize) : Loader
{
return new Loader\BufferLoader($overflowLoader, $bufferSize);
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public function onError(ErrorHandler $handler) : self
*
* @lazy
*
* @throws InvalidArgumentException
* @param int<1, max> $chunks
*/
public function parallelize(int $chunks) : self
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/ExternalSort/BufferCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ final class BufferCache
*/
private array $buffers = [];

/**
* @param int<1, max> $bufferSize
*/
public function __construct(
private readonly Cache $overflowCache,
private readonly int $bufferSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function sortBy(EntryReference ...$refs) : Extractor
{
/** @var array<string, \Generator<Rows>> $cachedPartsArray */
$cachedPartsArray = [];
$maxRowsSize = 0;
$maxRowsSize = 1;

/** @var int $i */
foreach ($this->cache->read($this->id) as $i => $rows) {
Expand Down
6 changes: 3 additions & 3 deletions src/core/etl/src/Flow/ETL/ExternalSort/MemorySort.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function __construct(
private readonly Cache $cache,
private Unit $maximumMemory
) {
$this->configuration = new Configuration($safetyBufferPercentage = 10);
$this->configuration = new Configuration(10);

if ($this->configuration->isLessThan($maximumMemory) && !$this->configuration->isInfinite()) {
/**
Expand All @@ -48,7 +48,7 @@ public function sortBy(EntryReference ...$refs) : Extractor
$memoryConsumption = new Consumption();

$mergedRows = new Rows();
$maxSize = 0;
$maxSize = 1;

foreach ($this->cache->read($this->cacheId) as $rows) {
$maxSize = \max($rows->count(), $maxSize);
Expand All @@ -64,6 +64,6 @@ public function sortBy(EntryReference ...$refs) : Extractor

$this->cache->clear($this->cacheId);

return new Extractor\ProcessExtractor(...$mergedRows->sortBy(...$refs)->chunks($maxSize));
return new Extractor\GeneratorExtractor($mergedRows->sortBy(...$refs)->chunks($maxSize));
}
}
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/BufferExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

final class BufferExtractor implements Extractor, OverridingExtractor
{
/**
* @param int<1, max> $maxRowsSize
*/
public function __construct(
private readonly Extractor $extractor,
private readonly int $maxRowsSize
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/ChunkExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

final class ChunkExtractor implements Extractor, OverridingExtractor
{
/**
* @param int<1, max> $chunkSize
*/
public function __construct(
private readonly Extractor $extractor,
private readonly int $chunkSize
Expand Down
34 changes: 34 additions & 0 deletions src/core/etl/src/Flow/ETL/Extractor/GeneratorExtractor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Extractor;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Rows;

/**
* @internal
*/
final class GeneratorExtractor implements Extractor
{
/**
* @param \Generator<Rows> $rows
*/
public function __construct(private readonly \Generator $rows)
{
}

public function extract(FlowContext $context) : \Generator
{
foreach ($this->rows as $row) {
if (!$row instanceof Rows) {
throw new InvalidArgumentException('Passed generator can contain only Rows class instances, given: ' . $row::class);
}

yield $row;
}
}
}
5 changes: 4 additions & 1 deletion src/core/etl/src/Flow/ETL/Loader/BufferLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
use Flow\ETL\Rows;

/**
* @implements Loader<array{overflow_loader: Loader, buffer_size: int}>
* @implements Loader<array{overflow_loader: Loader, buffer_size: int<1, max>}>
*/
final class BufferLoader implements Closure, Loader, OverridingLoader
{
private Rows $buffer;

/**
* @param int<1, max> $bufferSize
*/
public function __construct(private readonly Loader $overflowLoader, private readonly int $bufferSize)
{
$this->buffer = new Rows();
Expand Down
8 changes: 3 additions & 5 deletions src/core/etl/src/Flow/ETL/Pipeline/ParallelizingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Flow\ETL\Pipeline;

use Flow\ETL\DSL\From;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
Expand All @@ -20,14 +19,13 @@ final class ParallelizingPipeline implements Pipeline
{
private readonly Pipeline $nextPipeline;

/**
* @param int<1, max> $parallel
*/
public function __construct(
private readonly Pipeline $pipeline,
private readonly int $parallel
) {
if ($parallel < 1) {
throw new InvalidArgumentException("Parallel value can't be lower than 1.");
}

$this->nextPipeline = $pipeline->cleanCopy();
}

Expand Down
19 changes: 7 additions & 12 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,21 @@ public function __unserialize(array $data) : void
public function add(Row ...$rows) : self
{
return new self(
...\array_merge($this->rows, $rows)
...$this->rows,
...$rows
);
}

/**
* @return Rows[]
* @param int<1, max> $size
*
* @return \Generator<Rows>
*/
public function chunks(int $size) : array
public function chunks(int $size) : \Generator
{
if ($size < 1) {
throw InvalidArgumentException::because('Chunk size must be greater than 0');
}

$chunks = [];

foreach (\array_chunk($this->rows, $size) as $chunk) {
$chunks[] = new self(...$chunk);
yield new self(...$chunk);
}

return $chunks;
}

public function count() : int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use PhpBench\Attributes\Revs;

#[Iterations(5)]
#[Groups(['entry_factory'])]
#[Groups(['building_blocks'])]
final class NativeEntryFactoryBench
{
private array $rowsArray = [];
Expand Down
34 changes: 34 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Benchmark/RowsBench.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php declare(strict_types=1);

use Flow\ETL\Rows;
use PhpBench\Attributes\Groups;
use PhpBench\Attributes\Iterations;
use PhpBench\Attributes\Revs;

#[Iterations(5)]
#[Groups(['building_blocks'])]
final class RowsBench
{
private Rows $rows;

public function __construct()
{
$this->rows = Rows::fromArray(
\array_merge(...\array_map(static fn () : array => [
['id' => 1, 'random' => false, 'text' => null, 'from' => 666],
['id' => 2, 'random' => true, 'text' => null, 'from' => 666],
['id' => 3, 'random' => false, 'text' => null, 'from' => 666],
['id' => 4, 'random' => true, 'text' => null, 'from' => 666],
['id' => 5, 'random' => false, 'text' => null, 'from' => 666],
], \range(0, 10_000)))
);
}

#[Revs(5)]
public function bench_chunk_10() : void
{
foreach ($this->rows->chunks(10) as $chunk) {

}
}
}
Loading

0 comments on commit 6241e8a

Please sign in to comment.