Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch size hint of stream consumption #16372

Merged
merged 18 commits into from
Sep 4, 2024

Conversation

zhyass
Copy link
Member

@zhyass zhyass commented Sep 2, 2024

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

Introduces a feature that allows setting a batch size hint for stream consumption. This feature enables the system to optimize how data is read from a stream by providing an upper limit on the batch size, improving efficiency, especially when dealing with large data streams. The change includes updates to the logic for handling stream consumption and adds tests to ensure that the batch size hint works as intended.

Usage

stream_name WITH ([CONSUME = [true | false] , MAX_BATCH_SIZE = num])

NOTE:

  1. Within the same transaction, the batch size for a stream must remain consistent.
  2. It does not guarantee that the data read will be strictly below this limit; the actual size may vary.
mysql> create table t(a int);
Query OK, 0 rows affected (0.15 sec)

mysql> create stream s on table t;
Query OK, 0 rows affected (0.11 sec)

mysql> INSERT INTO t values(1);
Query OK, 1 row affected (0.21 sec)

mysql> INSERT INTO t values(2);
Query OK, 1 row affected (0.11 sec)

mysql> INSERT INTO t values(3);
Query OK, 1 row affected (0.12 sec)

mysql> select * from s with (max_batch_size= 1);
+------+---------------+------------------+----------------------------------------+
| a    | change$action | change$is_update | change$row_id                          |
+------+---------------+------------------+----------------------------------------+
|    1 | INSERT        |                0 | ec5b7cab2e5e4da0bad61f841f1110d9000000 |
+------+---------------+------------------+----------------------------------------+
1 row in set (0.26 sec)
Read 1 rows, 5.00 B in 0.052 sec., 19.29 rows/sec., 96.45 B/sec.

mysql> create table t1(a int);
Query OK, 0 rows affected (0.07 sec)

mysql> merge into t1 using s  with(max_batch_size = 1) on t1.a=s.a when matched then delete when not matched then insert(a) values(s.a);
+-------------------------+------------------------+
| number of rows inserted | number of rows deleted |
+-------------------------+------------------------+
|                       1 |                      0 |
+-------------------------+------------------------+
1 row in set (0.46 sec)
Read 1 rows, 5.00 B in 0.205 sec., 4.87 rows/sec., 24.34 B/sec.

mysql> select * from t1;
+------+
| a    |
+------+
|    1 |
+------+
1 row in set (0.05 sec)
Read 1 rows, 5.00 B in 0.015 sec., 65.84 rows/sec., 329.20 B/sec.

mysql> create or replace stage test_stage_stream_06;
Query OK, 0 rows affected (0.09 sec)

mysql> copy into @test_stage_stream_06/case1 from s  with(max_batch_size = 1);
+---------------+-------------+--------------+
| rows_unloaded | input_bytes | output_bytes |
+---------------+-------------+--------------+
|             1 |          67 |         1115 |
+---------------+-------------+--------------+
1 row in set (0.27 sec)
Read 1 rows, 5.00 B in 0.077 sec., 13.04 rows/sec., 65.21 B/sec.

mysql> select * from @test_stage_stream_06/case1;
+------+---------------+------------------+----------------------------------------+
| a    | change$action | change$is_update | change$row_id                          |
+------+---------------+------------------+----------------------------------------+
|    2 | INSERT        |                0 | 2d6778e7dd8d45bb8e0b4edd7b5b852f000000 |
+------+---------------+------------------+----------------------------------------+
1 row in set (0.10 sec)
Read 1 rows, 83.00 B in 0.050 sec., 19.89 rows/sec., 1.61 KiB/sec.

mysql> select * from s  with(consume=true, max_batch_size = 1);
+------+---------------+------------------+----------------------------------------+
| a    | change$action | change$is_update | change$row_id                          |
+------+---------------+------------------+----------------------------------------+
|    3 | INSERT        |                0 | fea71f59682e4fbd95d298b840abd9f7000000 |
+------+---------------+------------------+----------------------------------------+
1 row in set (0.20 sec)
Read 1 rows, 5.00 B in 0.035 sec., 28.3 rows/sec., 141.52 B/sec.

mysql> select * from s;
Empty set (0.18 sec)
Read 0 rows, 0.00 B in 0.014 sec., 0 rows/sec., 0.00 B/sec.
mysql> select s.a from s  with(max_batch_size = 1) join s as s1 on s.a=s1.a;
ERROR 1105 (HY000): StorageUnsupported. Code: 3902, Text = Within the same transaction, the batch size for a stream must remain consistent.
mysql> insert into t values(5);
Query OK, 1 row affected (0.15 sec)

mysql> select * from s;
+------+---------------+------------------+----------------------------------------+
| a    | change$action | change$is_update | change$row_id                          |
+------+---------------+------------------+----------------------------------------+
|    5 | INSERT        |                0 | 17355eae5d8f48ed81c8c4dd180cb925000000 |
+------+---------------+------------------+----------------------------------------+
1 row in set (0.21 sec)
Read 1 rows, 5.00 B in 0.040 sec., 25 rows/sec., 125.01 B/sec.

mysql> begin;
Query OK, 0 rows affected (0.02 sec)

mysql> insert into t1 select a from s  with(max_batch_size = 1);
Query OK, 1 row affected (0.29 sec)

mysql> insert into t1 select a from s;
ERROR 1105 (HY000): StorageUnsupported. Code: 3902, Text = Within the same transaction, the batch size for a stream must remain consistent.
mysql> rollback;
Query OK, 0 rows affected (0.02 sec)
  1. WITH (CONSUME = true, ..) is not allowed in DML.
mysql> insert into t2 select a from s with (consume=true, max_batch_size = 1);
ERROR 1105 (HY000): SyntaxException. Code: 1005, Text = WITH CONSUME only allowed in query.

mysql> insert into t2 select a from s with (max_batch_size = 1);
Query OK, 0 rows affected (0.42 sec)

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Sep 2, 2024
@zhyass zhyass marked this pull request as draft September 2, 2024 15:16
@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Sep 2, 2024
@dosubot dosubot bot added the A-query Area: databend query label Sep 2, 2024
@zhyass zhyass marked this pull request as ready for review September 3, 2024 16:19
@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. C-feature Category: feature and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Sep 3, 2024
- add some comments
- add extra cases
Copy link
Member

@dantengsky dantengsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @SkyFan2002 PTAL

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Sep 3, 2024
dantengsky and others added 3 commits September 4, 2024 08:24
@BohuTANG
Copy link
Member

BohuTANG commented Sep 4, 2024

We should introduce the syntax for the new option max_batch_size_hint.

@zhyass zhyass marked this pull request as draft September 4, 2024 01:57
@zhyass zhyass marked this pull request as ready for review September 4, 2024 09:29
@zhyass zhyass requested a review from dantengsky September 4, 2024 09:29
@dantengsky dantengsky added this pull request to the merge queue Sep 4, 2024
@BohuTANG BohuTANG removed this pull request from the merge queue due to a manual request Sep 4, 2024
@BohuTANG BohuTANG merged commit c5e75f5 into databendlabs:main Sep 4, 2024
83 checks passed
@BohuTANG
Copy link
Member

BohuTANG commented Sep 5, 2024

@soyeric128 Eric, this new stream option need doc, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-query Area: databend query C-feature Category: feature lgtm This PR has been approved by a maintainer pr-feature this PR introduces a new feature to the codebase size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature: batch size hint of stream consumption
5 participants