Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize some kinds of load jobs #1762

Merged
merged 4 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ std::string StreamLoadContext::to_json() const {
break;
case TStatusCode::LABEL_ALREADY_EXISTS:
writer.String("Label Already Exists");
writer.Key("ExistingJobStatus");
writer.String(existing_job_status.c_str());
break;
default:
writer.String("Fail");
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class StreamLoadContext {
int64_t start_nanos = 0;
int64_t load_cost_nanos = 0;
std::string error_url = "";
// if label already be used, set existing job's status here
// should be RUNNING or FINISHED
std::string existing_job_status = "";

KafkaLoadInfo* kafka_info = nullptr;

Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
if (!status.ok()) {
LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg()
<< ctx->brief();
if (result.__isset.job_status) {
ctx->existing_job_status = result.job_status;
}
return status;
}
ctx->txn_id = result.txnId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ Insert Into 命令需要通过 MySQL 协议提交,创建导入请求会同步
语法:

```
INSERT INTO table_name [partition_info] [col_list] [query_stmt] [VALUES];
INSERT INTO table_name [WITH LABEL label] [partition_info] [col_list] [query_stmt] [VALUES];
```

示例:

```
INSERT INTO tbl2 SELECT * FROM tbl3;
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");
```

Expand All @@ -50,6 +50,14 @@ INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");

*注意:VALUES 方式仅适用于导入几条数据作为导入 DEMO 的情况,完全不适用于任何测试和生产环境。Doris 系统本身也不适合单条数据导入的场景。建议使用 INSERT INTO SELECT 的方式进行批量导入。*

* WITH LABEL

INSERT 操作作为一个导入任务,也可以指定一个 label。如果不指定,则系统会自动指定一个 UUID 作为 label。

该功能需要 0.11+ 版本。

*注意:建议指定 Label 而不是由系统自动分配。如果由系统自动分配,但在 Insert Into 语句执行过程中,因网络错误导致连接断开等,则无法得知 Insert Into 是否成功。而如果指定 Label,则可以再次通过 Label 查看任务结果。*

### 导入结果

Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。
Expand All @@ -66,13 +74,24 @@ Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令
Query OK, 100 row affected, 0 warning (0.22 sec)
```

导入可能部分成功,则还会附加一个 Label 字段。示例如下:
如果用户指定了 Label,则会也会返回 Label
```
Query OK, 100 row affected, 0 warning (0.22 sec)
{'label':'user_specified_label'}
```

导入可能部分成功,则会附加 Label 字段。示例如下:

```
Query OK, 100 row affected, 1 warning (0.23 sec)
{'label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'}
```

```
Query OK, 100 row affected, 1 warning (0.23 sec)
{'label':'user_specified_label'}
```

其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。

如果没有任何数据,也会返回成功,且 affected 和 warning 都是 0。
Expand Down Expand Up @@ -152,7 +171,7 @@ bj_store_sales schema:
由于用户是希望将一张表中的数据做 ETL 并导入到目标表中,所以应该使用 Insert into query\_stmt 方式导入。

```
INSERT INTO bj_store_sales SELECT id, total, user_id, sale_timestamp FROM store_sales where region = "bj";
INSERT INTO bj_store_sales WITH LABEL `label` SELECT id, total, user_id, sale_timestamp FROM store_sales where region = "bj";
```

## 常见问题
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
"TxnId": 1003,
"Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
"Status": "Success",
"ExistingJobStatus": "FINISHED", // optional
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
Expand All @@ -190,6 +191,10 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
"Label Already Exists":Label 重复,需更换 Label。

"Fail":导入失败。

+ ExistingJobStatus:已存在的 Label 对应的导入作业的状态。

这个字段只有在当 Status 为 "Label Already Exists" 是才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED" 表示作业成功。

+ Message:导入错误信息。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

```
INSERT INTO table_name
[ WITH LABEL label]
[ PARTITION (, ...) ]
[ (column [, ...]) ]
[ \[ hint [, ...] \] ]
[ [ hint [, ...] ] ]
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
```

### Parameters

> tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
>
> label: 为 Insert 任务指定一个 label
>
> partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
>
> column_name: 指定的目的列,必须是 `table_name` 中存在的列
Expand Down Expand Up @@ -50,27 +53,27 @@ INSERT INTO test (c1) VALUES (1);
2. 向`test`表中一次性导入多行数据

```
INSERT INTO test VALUES (1, 2), (3, 2 + 2)
INSERT INTO test (c1, c2) VALUES (1, 2), (3, 2 * 2)
INSERT INTO test (c1) VALUES (1), (3)
INSERT INTO test (c1, c2) VALUES (1, DEFAULT), (3, DEFAULT)
INSERT INTO test VALUES (1, 2), (3, 2 + 2);
INSERT INTO test (c1, c2) VALUES (1, 2), (3, 2 * 2);
INSERT INTO test (c1) VALUES (1), (3);
INSERT INTO test (c1, c2) VALUES (1, DEFAULT), (3, DEFAULT);
```

其中第一条、第二条语句效果一样,向`test`表中一次性导入两条数据
第三条、第四条语句效果已知,使用`c2`列的默认值向`test`表中导入两条数据

3. 向`test`表中同步的导入一个查询语句的返回结果
3. 向 `test` 表中导入一个查询语句结果

```
INSERT INTO test [streaming] SELECT * FROM test2
INSERT INTO test (c1, c2) [streaming] SELECT * from test2
INSERT INTO test SELECT * FROM test2;
INSERT INTO test (c1, c2) SELECT * from test2;
```

4. 向`test`表中异步的导入一个查询语句结果
4. 向 `test` 表中导入一个查询语句结果,并指定 label

```
INSERT INTO test SELECT * FROM test2
INSERT INTO test (c1, c2) SELECT * from test2
INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
```

异步的导入其实是,一个同步的导入封装成了异步。填写 streaming 和不填写的**执行效率是一样**的。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ The Insert Into command needs to be submitted through MySQL protocol. Creating a
Grammar:

```
INSERT INTO table_name [partition_info] [col_list] [query_stmt] [VALUES];
INSERT INTO table_name [WITH LABEL label] [partition_info] [col_list] [query_stmt] [VALUES];
```

Examples:

```
INSERT INTO tbl2 SELECT * FROM tbl3;
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");
```

Expand All @@ -49,12 +49,56 @@ The following is a brief introduction to the parameters used in creating import
Users can insert one or more data through VALUES grammar.

*Note: VALUES is only suitable for importing several pieces of data as DEMO. It is totally unsuitable for any test and production environment. Doris system itself is not suitable for single data import scenarios. It is recommended to use INSERT INTO SELECT for batch import.*

* WITH LABEL

INSERT as a load job, it can also be with a label. If not with a label, Doris will use a UUID as label.

This feature needs Doris version 0.11+.

*Note: It is recommended that Label be specified rather than automatically allocated by the system. If the system allocates automatically, but during the execution of the Insert Into statement, the connection is disconnected due to network errors, etc., then it is impossible to know whether Insert Into is successful. If you specify Label, you can view the task results again through Label.*

### Load results

Insert Into itself is an SQL command, so the return behavior is the same as the return behavior of the SQL command.

If the import fails, the return statement fails to execute. If the import succeeds, the return statement executes successfully and a Label field is appended.
If the load fails, the error will be returned. Examples are as follows:


```
ERROR 1064 (HY000): All partitions have no load data. url: http://ip:port/api/_load_error_log?File=_shard_14/error_log_insert_stmt_f435264d82f342e4-a33764f5f0dfbf00_f4364d82f342e4_a764f344e4_a764f5f5f0df0df0dbf00
```

Where URL can be used to query the wrong data, see the following **view error line** summary.

If the load succeeds, the success will be returned. Examples are as follows:

```
Query OK, 100 row affected, 0 warning (0.22 sec)
```

If the user specifies Label, the label will be returned as well.

```
Query OK, 100 row affected, 0 warning (0.22 sec)
{label':'user_specified_label'}
```

If the load may be partially successful, the Label field is appended. Examples are as follows:

```
Query OK, 100 row affected, 1 warning (0.23 sec)
{label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'}
```

```
Query OK, 100 row affected, 1 warning (0.23 sec)
{label':'user_specified_label'}
```

Where affected represents the number of rows loaded. Warning denotes the number of rows that failed. Users need to view the wrong line through `SHOW LOAD WHERE LABEL='xxx';` command, and get url to view the errors.

If there is no data, it will return success, and both affected and warning are 0.

Label is the identifier of the Insert Into import job. Each import job has a unique Label inside a single database. Insert Into's Label is generated by the system. Users can use the Label to asynchronously obtain the import status by querying the import command.

Expand All @@ -70,7 +114,7 @@ Label is the identifier of the Insert Into import job. Each import job has a uni

At the same time, the Insert Into statement receives the restriction of the Session variable `query_timeout`. You can increase the timeout time by `SET query_timeout = xxx;` in seconds.

### Session 变量
### Session Variables

+ enable\_insert\_strict

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Examples:
"TxnId": 1003,
"Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
"Status": "Success",
"ExistingJobStatus": "FINISHED", // optional
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
Expand All @@ -151,6 +152,10 @@ The following main explanations are given for the Stream load import result para
"Label Already Exists":Label 重复,需更换 Label。

"Fail": Import failed.

+ ExistingJobStatus: The state of the load job corresponding to the existing Label.

This field is displayed only when the status is "Label Already Exists". The user can know the status of the load job corresponding to Label through this state. "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.

+ Message: Import error messages.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
"35; INSERT
Description
'35;'35;' 35; Syntax
# INSERT
## Description
### Syntax

```
INSERT INTO table_name
[PARTICIPATION [...]
[ WITH LABEL label]
[ PARTICIPATION [...]
[ (column [, ...]) ]
[ \[ hint [, ...] \] ]
[ [ hint [, ...] ] ]
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
```

### Parameters

> tablet_name: Target table for importing data. It can be in the form of `db_name. table_name'.
> tablet_name: Target table for loading data. It can be in the form of `db_name.table_name`.
>
> partition_names: Specifies that the partition to be imported must be a partition that exists in `table_name', with multiple partition names separated by commas
> label: Specifies a label for Insert job.
>
> column_name: The specified destination column must be a column that exists in `table_name'.
> partition_names: Specifies the partitions to be loaded, with multiple partition names separated by commas. The partitions must exist in `table_name`,
>
> expression: The corresponding expression that needs to be assigned to a column
> column_name: The specified destination columns must be columns that exists in `table_name`.
>
> expression: The corresponding expression that needs to be assigned to a column.
>
> DEFAULT: Let the corresponding columns use default values
>
> query: A common query whose results are written to the target
>
> hint: Indicators used to indicate `INSERT'execution. ` Both streaming `and default non `streaming'methods use synchronization to complete `INSERT' statement execution
> hint: Indicators used to indicate `INSERT` execution. ` Both streaming `and default non `streaming'methods use synchronization to complete `INSERT' statement execution
> The non `streaming'mode returns a label after execution to facilitate users to query the imported status through `SHOW LOAD'.

'35;'35;' 35; Note
### Note

When the `INSERT'statement is currently executed, the default behavior for data that does not conform to the target table is filtering, such as string length. However, for business scenarios where data is not filtered, the session variable `enable_insert_strict'can be set to `true' to ensure that `INSERT'will not be successfully executed when data is filtered out.

'35;'35; example
## example

` The test `table contains two columns `c1', `c2'.

Expand Down Expand Up @@ -59,18 +62,19 @@ Insert in test (C1, C2) values (1, Default), (3, Default)
The effect of the first and second statements is the same, and two data are imported into the `test'table at one time.
The effect of the third and fourth statements is known, using the default value of the `c2'column to import two data into the `test' table.

3. Return results of importing a query statement synchronously into the `test'table

3. Insert into table `test` with a query stmt.

```
INSERT INTO test [streaming] SELECT * FROM test2
INSERT INTO test (c1, c2) [streaming] SELECT * from test2
INSERT INTO test SELECT * FROM test2
INSERT INTO test (c1, c2) SELECT * from test2
```

4. Import an asynchronous query result into the `test'table
4. Insert into table `test` with specified label

```
INSERT INTO test SELECT * FROM test2
INSERT INTO test (c1, c2) SELECT * from test2
INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
```

Asynchronous imports are, in fact, encapsulated asynchronously by a synchronous import. Filling in streaming is as efficient as not filling in * execution.
Expand Down
16 changes: 14 additions & 2 deletions fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ nonterminal AccessPrivilege privilege_type;
nonterminal DataDescription data_desc;
nonterminal List<DataDescription> data_desc_list;
nonterminal LabelName job_label;
nonterminal String opt_with_label;
nonterminal String opt_system;
nonterminal String opt_cluster;
nonterminal BrokerDesc opt_broker;
Expand Down Expand Up @@ -2334,9 +2335,9 @@ use_stmt ::=

// Insert statement
insert_stmt ::=
KW_INSERT KW_INTO insert_target:target opt_col_list:cols opt_plan_hints:hints insert_source:source
KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
{:
RESULT = new InsertStmt(target, cols, source, hints);
RESULT = new InsertStmt(target, label, cols, source, hints);
:}
// TODO(zc) add default value for SQL-2003
// | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES
Expand All @@ -2349,6 +2350,17 @@ insert_target ::=
:}
;

opt_with_label ::=
/* empty */
{:
RESULT = null;
:}
| KW_WITH KW_LABEL ident:label
{:
RESULT = label;
:}
;

insert_source ::=
query_stmt:query
{:
Expand Down
Loading