Skip to content

Commit

Permalink
Remove batchsize from loaders (#733)
Browse files Browse the repository at this point in the history
* Removed chunkSize from Loaders

* Added removal of chunk size to upgrade.md
  • Loading branch information
norberttech committed Nov 5, 2023
1 parent 97a7539 commit 02dea7d
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 116 deletions.
5 changes: 5 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ Rows are yielded one by one.
descriptive and self-explanatory.
It's no longer mandatory to set this flat to true when using SaveMode::APPEND, it's now set automatically.

### 9) Loaders - chunk size

Loaders are no longer accepting chunk_size parameter, from now in order to control
the number of rows saved at once use `DataFrame::batchSize(int $size)` method.

---

## Upgrading from 0.3.x to 0.4.x
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/etl-adapter-amphp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ $logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false));
->withEntry('id', ref('id')->cast('int'))
->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
->drop('last_name')
->load(new DbalLoader($tableName, $chunkSize = 1000, $dbConnectionParams))
->load(new DbalLoader($tableName, $dbConnectionParams))
->run();
```

Expand Down
6 changes: 3 additions & 3 deletions src/adapter/etl-adapter-doctrine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ ETL::extract(

All supported types of `DbalBulkLoader` loading:

- `::insert(Connection $connection, int $bulkChunkSize, string $table, QueryFactory $queryFactory = null) : self`
- `::insertOrSkipOnConflict(Connection $connection, int $bulkChunkSize, string $table, QueryFactory $queryFactory = null) : self`
- `::insertOrUpdateOnConstraintConflict(Connection $connection, int $bulkChunkSize, string $table, string $constraint, QueryFactory $queryFactory = null) : self`
- `::insert(Connection $connection, string $table, QueryFactory $queryFactory = null) : self`
- `::insertOrSkipOnConflict(Connection $connection, string $table, QueryFactory $queryFactory = null) : self`
- `::insertOrUpdateOnConstraintConflict(Connection $connection, string $table, string $constraint, QueryFactory $queryFactory = null) : self`

The `bulkSize` means how many rows you want to push to a database in a single `INSERT` query. Each extracted rows set
is going to be split before inserting data into the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
/**
* @implements Loader<array{
* table_name: string,
* chunk_size: int<1, max>,
* connection_params: array<string, mixed>,
* operation: string,
* operation_options: array{
Expand All @@ -35,7 +34,6 @@ final class DbalLoader implements Loader
private string $operation;

/**
* @param int<1, max> $chunkSize
* @param array<string, mixed> $connectionParams
* @param array{
* skip_conflicts?: boolean,
Expand All @@ -49,7 +47,6 @@ final class DbalLoader implements Loader
*/
public function __construct(
private string $tableName,
private int $chunkSize,
private array $connectionParams,
private array $operationOptions = [],
string $operation = 'insert'
Expand All @@ -64,7 +61,6 @@ public function __construct(
* Since Connection::getParams() is marked as an internal method, please
* use this constructor with caution.
*
* @param int<1, max> $chunkSize
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand All @@ -78,12 +74,11 @@ public function __construct(
public static function fromConnection(
Connection $connection,
string $tableName,
int $chunkSize,
array $operationOptions = [],
string $operation = 'insert'
) : self {
/** @psalm-suppress InternalMethod */
$loader = new self($tableName, $chunkSize, $connection->getParams(), $operationOptions, $operation);
$loader = new self($tableName, $connection->getParams(), $operationOptions, $operation);
$loader->connection = $connection;

return $loader;
Expand All @@ -93,7 +88,6 @@ public function __serialize() : array
{
return [
'table_name' => $this->tableName,
'chunk_size' => $this->chunkSize,
'connection_params' => $this->connectionParams,
'operation' => $this->operation,
'operation_options' => $this->operationOptions,
Expand All @@ -103,22 +97,19 @@ public function __serialize() : array
public function __unserialize(array $data) : void
{
$this->tableName = $data['table_name'];
$this->chunkSize = $data['chunk_size'];
$this->connectionParams = $data['connection_params'];
$this->operation = $data['operation'];
$this->operationOptions = $data['operation_options'];
}

public function load(Rows $rows, FlowContext $context) : void
{
foreach ($rows->chunks($this->chunkSize) as $chunk) {
Bulk::create()->{$this->operation}(
$this->connection(),
$this->tableName,
new BulkData($chunk->sortEntries()->toArray()),
$this->operationOptions
);
}
Bulk::create()->{$this->operation}(
$this->connection(),
$this->tableName,
new BulkData($rows->sortEntries()->toArray()),
$this->operationOptions
);
}

private function connection() : Connection
Expand Down
16 changes: 8 additions & 8 deletions src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ final public static function from_query(
}

/**
* In order to control the size of the single insert, use DataFrame::chunkSize() method just before calling DataFrame::load().
*
* @param array<string, mixed>|Connection $connection
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand All @@ -147,17 +148,17 @@ final public static function from_query(
final public static function to_table_insert(
array|Connection $connection,
string $table,
int $chunk_size = 1000,
array $options = [],
) : Loader {
return \is_array($connection)
? new DbalLoader($table, $chunk_size, $connection, $options, 'insert')
: DbalLoader::fromConnection($connection, $table, $chunk_size, $options, 'insert');
? new DbalLoader($table, $connection, $options, 'insert')
: DbalLoader::fromConnection($connection, $table, $options, 'insert');
}

/**
* In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load().
*
* @param array<string, mixed>|Connection $connection
* @param int<1, max> $chunk_size
* @param array{
* skip_conflicts?: boolean,
* constraint?: string,
Expand All @@ -173,11 +174,10 @@ final public static function to_table_insert(
final public static function to_table_update(
array|Connection $connection,
string $table,
int $chunk_size = 1000,
array $options = [],
) : Loader {
return \is_array($connection)
? new DbalLoader($table, $chunk_size, $connection, $options, 'update')
: DbalLoader::fromConnection($connection, $table, $chunk_size, $options, 'update');
? new DbalLoader($table, $connection, $options, 'update')
: DbalLoader::fromConnection($connection, $table, $options, 'update');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Flow;
use Flow\Serializer\CompressingSerializer;
use Flow\Serializer\NativePHPSerializer;

final class DbalLoaderTest extends IntegrationTestCase
{
Expand All @@ -34,7 +33,7 @@ public function test_create_loader_with_invalid_operation() : void
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Operation can be insert or update, delete given.');

new DbalLoader($table, $bulkSize = 10, $this->connectionParams(), [], 'delete');
new DbalLoader($table, $this->connectionParams(), [], 'delete');
}

public function test_create_loader_with_invalid_operation_from_connection() : void
Expand All @@ -55,7 +54,6 @@ public function test_create_loader_with_invalid_operation_from_connection() : vo
DbalLoader::fromConnection(
$this->pgsqlDatabaseContext->connection(),
$table,
$bulkSize = 10,
[],
'delete'
);
Expand All @@ -73,7 +71,7 @@ public function test_inserts_multiple_rows_at_once() : void
))
->setPrimaryKey(['id']));

$loader = Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10);
$loader = Dbal::to_table_insert($this->connectionParams(), $table);

(new Flow())
->read(
Expand Down Expand Up @@ -103,8 +101,8 @@ public function test_inserts_multiple_rows_at_once_after_serialization_and_deser
->setPrimaryKey(['id'])
);

$serializer = new CompressingSerializer(new NativePHPSerializer());
$loaderSerialized = $serializer->serialize(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10));
$serializer = new CompressingSerializer();
$loaderSerialized = $serializer->serialize(Dbal::to_table_insert($this->connectionParams(), $table));

(new Flow())
->read(
Expand Down Expand Up @@ -134,7 +132,7 @@ public function test_inserts_multiple_rows_at_once_using_existing_connection() :
->setPrimaryKey(['id'])
);

$loader = Dbal::to_table_insert($this->pgsqlDatabaseContext->connection(), $table, $bulkSize = 10);
$loader = Dbal::to_table_insert($this->pgsqlDatabaseContext->connection(), $table);

(new Flow())
->read(
Expand Down Expand Up @@ -171,7 +169,7 @@ public function test_inserts_multiple_rows_in_two_insert_queries() : void
['id' => 3, 'name' => 'Name Three', 'description' => 'Description Three'],
])
)
->load(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10))
->load(Dbal::to_table_insert($this->connectionParams(), $table))
->run();

$this->assertEquals(3, $this->pgsqlDatabaseContext->tableCount($table));
Expand All @@ -196,7 +194,7 @@ public function test_inserts_new_rows_and_skip_already_existed() : void
['id' => 3, 'name' => 'Name Three', 'description' => 'Description Three'],
])
)
->load(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10))
->load(Dbal::to_table_insert($this->connectionParams(), $table))
->run();

(new Flow())
Expand All @@ -207,7 +205,7 @@ public function test_inserts_new_rows_and_skip_already_existed() : void
['id' => 4, 'name' => 'New Name Four', 'description' => 'New Description Three'],
])
)
->load(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10, ['skip_conflicts' => true]))
->load(Dbal::to_table_insert($this->connectionParams(), $table, ['skip_conflicts' => true]))
->run();

$this->assertEquals(4, $this->pgsqlDatabaseContext->tableCount($table));
Expand Down Expand Up @@ -244,7 +242,7 @@ public function test_inserts_new_rows_or_updates_already_existed_based_on_primar
['id' => 3, 'name' => 'Name Three', 'description' => 'Description Three'],
])
)
->load(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10))
->load(Dbal::to_table_insert($this->connectionParams(), $table))
->run();

(new Flow())->extract(
Expand All @@ -254,7 +252,7 @@ public function test_inserts_new_rows_or_updates_already_existed_based_on_primar
['id' => 4, 'name' => 'New Name Four', 'description' => 'New Description Three'],
])
)
->load(Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10, ['constraint' => 'flow_doctrine_bulk_test_pkey']))
->load(Dbal::to_table_insert($this->connectionParams(), $table, ['constraint' => 'flow_doctrine_bulk_test_pkey']))
->run();

$this->assertEquals(4, $this->pgsqlDatabaseContext->tableCount($table));
Expand All @@ -281,7 +279,7 @@ public function test_that_operation_is_lower_cased() : void
))
->setPrimaryKey(['id']));

$loader = Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10);
$loader = Dbal::to_table_insert($this->connectionParams(), $table);

$this->assertSame($loader->__serialize()['operation'], 'insert');
}
Expand All @@ -301,7 +299,6 @@ public function test_that_operation_is_lower_cased_from_connection_method() : vo
$loader = Dbal::to_table_insert(
$this->pgsqlDatabaseContext->connection(),
$table,
$bulkSize = 10,
$this->connectionParams()
);

Expand All @@ -320,8 +317,8 @@ public function test_update_multiple_rows_at_once() : void
))
->setPrimaryKey(['id']));

$insertLoader = Dbal::to_table_insert($this->connectionParams(), $table, $bulkSize = 10);
$updateLoader = Dbal::to_table_update($this->connectionParams(), $table, $bulkSize = 10, ['primary_key_columns' => ['id'], ['update_columns' => ['name']]]);
$insertLoader = Dbal::to_table_insert($this->connectionParams(), $table);
$updateLoader = Dbal::to_table_update($this->connectionParams(), $table, ['primary_key_columns' => ['id'], ['update_columns' => ['name']]]);

(new Flow())->extract(
From::array([
Expand All @@ -330,6 +327,7 @@ public function test_update_multiple_rows_at_once() : void
['id' => 3, 'name' => 'Name Three', 'description' => 'Description Three'],
])
)
->batchSize(10)
->load($insertLoader)
->run();

Expand All @@ -341,6 +339,7 @@ public function test_update_multiple_rows_at_once() : void
['id' => 3, 'name' => 'Changed Name Three', 'description' => 'Description Three'],
])
)
->batchSize(10)
->load($updateLoader)
->run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function test_extracting_multiple_rows_at_once() : void
['id' => 3, 'name' => 'Name Three', 'description' => 'Description Three'],
])
)->load(
DbalLoader::fromConnection($this->pgsqlDatabaseContext->connection(), $table, chunkSize: 10)
DbalLoader::fromConnection($this->pgsqlDatabaseContext->connection(), $table)
)->run();

$rows = (new Flow())->extract(
Expand Down Expand Up @@ -83,7 +83,7 @@ public function test_extracting_multiple_rows_multiple_times() : void
['id' => 10, 'name' => 'Name', 'description' => 'Description'],
])
)
->load(DbalLoader::fromConnection($this->pgsqlDatabaseContext->connection(), $table, chunkSize: 10))
->load(DbalLoader::fromConnection($this->pgsqlDatabaseContext->connection(), $table))
->run();

$rows = (new Flow())->extract(
Expand Down
Loading

0 comments on commit 02dea7d

Please sign in to comment.