Skip to content

Commit

Permalink
add data transfer between tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jieguangzhou committed Oct 27, 2022
1 parent 21caec0 commit b27f0a1
Show file tree
Hide file tree
Showing 18 changed files with 1,044 additions and 1 deletion.
102 changes: 102 additions & 0 deletions docs/docs/en/guide/parameter/file-parameter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# FILE Parameter

Use the file parameter to pass files (or folders, hereinafter referred to as **files**) in the working directory of the upstream task to the downstream task in the same workflow instance. The following scenarios may be used

- In the ETL task, pass the data files processed by multiple upstream tasks to a specific downstream task.
- In the machine learning scenario, pass the data set file of the upstream data preparation task to the downstream model training task.

## Usage

### Configure file parameter

File parameter configuration method: click the plus sign on the right side of "Custom Parameters" on the task definition page to configure.

### Output file to downstream task

**Four options of custom parameters are:**

- Parameter name: the identifier used when passing tasks, such as `KEY1` and `KEY2` in the figure below
- Direction: OUT, which means outputting the file to the downstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: output file path, such as `data` and `data/test2/text.txt` in the figure below

The configuration in the figure below indicates that the `output` task passes two file data to the downstream task, respectively:

- Pass out the folder `data`, and mark it as `dir-data`. The downstream task can get this folder through `output.dir-data`
- Pass out the file `data/test2/text.txt`, and mark it as `file-text`. The downstream task can get this folder through `output.file-text`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)

### Get the file from the upstream task

**Four options of custom parameters are:**

- Parameter name: the position where the upstream file is saved after input, such as `input_dir` used in the figure below
- Direction: IN, which means to get the file from the upstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: the identifier of the upstream file, in the format of `taskName.KEY`. For example, `output.dir-data` in the figure below, where `output` is the name of the upstream task, and `dir-data` is the file identifier output by the upstream task

The configuration in the figure below indicates that the task gets the folder identified by `dir-data` from the upstream task `output` and saves it as `input_dir`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)

The configuration in the figure below indicates that the task gets the file identified by `file-text` from the upstream task `output` and saves it as `input.txt`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)

## Other

### Note

- The file transfer between upstream and downstream tasks is based on the resource center as a transfer, and the data is saved in the `DATA_TRANSFER` directory of the resource center. Therefore, **the resource center function must be enabled**, please refer to [Resource Center Configuration Details](../resource/configuration.md) for details, otherwise the file parameter function cannot be used.
- The file naming rule is `DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName`
- If the transferred file data is a folder, it will be packaged into a compressed file with a suffix of `.zip` and uploaded. The downstream task will unzip and save it in the corresponding directory after receiving it
- If you need to delete the file data, you can delete the corresponding folder in the `DATA_TRANSFER` directory of the resource center. If you delete the date subdirectory directly, you will delete all the file data on that date
- If there is a task chain task1->task2->tas3, then the downstream task task3 can also get the file data of task1
- Support one-to-many transmission and many-to-one transmission
- If you frequently transfer a large number of files, it is obvious that the system IO performance will be affected by the amount of transferred data

### Example

You can save the following YAML file locally and then execute `pydolphinscheduler yaml -f data-transfer.yaml` to run the Demo.

```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true

# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }

- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }


- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```
101 changes: 101 additions & 0 deletions docs/docs/zh/guide/parameter/file-parameter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# 文件参数

通过配置文件参数,在同一工作流实例中,可以将上游任务工作目录下的文件(或文件夹,下统一以**文件**代替)传递给下游任务。 如以下场景可能使用到

- 在ETL任务中,将多个上游任务处理好的数据文件一起传递给特定的下游任务。
- 在机器学习场景中,将上游数据准备任务的数据集文件传递给下游模型训练任务。

## 使用方式

### 配置文件参数

文件参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,即可进行配置。

### 输出文件给下游任务

**自定义参数四个选项分别为:**

- 参数名:任务间传递时使用的标识,如下图中使用的`KEY1``KEY2`
- 方向:OUT, 则表示输出文件给下游任务
- 参数类型:FILE, 表示文件参数
- 参数值:输出的文件路径,如下图中的`data``data/test2/text.txt`

下图的配置表示任务`output`向下游任务传递两个文件数据,分别为:
- 传出文件夹 `data`, 并标记为`dir-data`, 下游任务可以通过`output.dir-data`获取该文件夹
- 传出文件 `data/test2/text.txt`, 并标记为`file-text`, 下游任务可以通过`output.file-text`获取该文件夹

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)

### 获取上游任务的文件

**自定义参数四个选项分别为:**

- 参数名:上游文件输入后保存的位置,如下图中使用的`input_dir`
- 方向:IN, 则表示从上游任务获取文件
- 参数类型:FILE, 表示文件参数
- 参数值:上游文件的标识,为 `taskName.KEY` 的格式 如下图中的`output.dir-data`, 其中`output`为上游任务的名称,`dir-data`为上游任务中输出的文件标识

下图的配置表示任务从上游任务`output`中获取标识为`dir-data`的文件夹,并保存为`input_dir`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)

下图的配置表示任务从上游任务`output`中获取标识为`file-text`的文件,并保存为`input.txt`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)

## 其他

### 备注

- 上下游任务间的文件传递基于资源中心作为中转,数据保存在资源中心`DATA_TRANSFER`的目录下, 因此**必须开启资源中心功能**,详情请参考[资源中心配置详情](../resource/configuration.md), 否则无法使用文件参数功能。
- 文件命名规则为 `DATA_TRANSFER/日期/工作流Code/工作流版本_工作流实例ID/任务名称_任务实例ID_文件名`
- 若传输的文件数据为文件夹,则会打包成后缀为`.zip`的压缩文件再上传,下游任务接到后会解压并保存在对应目录
- 若需要删除文件数据,可以在资源中心的`DATA_TRANSFER`目录下删除对应文件夹即可, 如直接按照日期子目录删除,会删除该日期下所有的文件数据
- 如果存在任务链 task1->task2->tas3, 则最下游任务task3也能获取task1的文件数据
- 支持一对多传输以及多对一传输
- 如果频繁大量传输文件,毫无疑问会因传输的数据量影响到系统IO性能

### 样例

你可以保存以下YAML文件到本地,然后执行`pydolphinscheduler yaml -f data-transfer.yaml`即可运行Demo.

```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true

# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }

- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }


- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ public Result<Object> deleteResource(@Parameter(hidden = true) @RequestAttribute
return resourceService.delete(loginUser, fullName, tenantCode);
}

/**
* delete DATA_TRANSFER data
*
* @param loginUser login user
* @return delete result code
*/
@Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ")
@Parameters({
@Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = String.class, example = "test/"))
})
@DeleteMapping(value = "/data-transfer-delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_RESOURCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Map<String, Object> deleteDataTransferData(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "days") Integer days,
@RequestParam(value = "tenantCode", required = false) String tenantCode) throws Exception {
return resourceService.deleteDataTransferData(loginUser, days);
}

/**
* verify resource by alias and type
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ Result<Object> updateResourceContent(User loginUser, String fullName, String ten
*/
Resource queryResourcesFileInfo(String userName, String fullName);

/**
* delete DATA_TRANSFER data in resporce
*
* @param loginUser user who query resource
* @param days number of days
*/
Map<String, Object> deleteDataTransferData(User loginUser, Integer days);

/**
* unauthorized file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.rmi.ServerException;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
return (Resource) resourceResponse.getData();
}

@Override
public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {
Map<String, Object> result = new HashMap<>();

User user = userMapper.selectById(loginUser.getId());
if (user == null) {
logger.error("user {} not exists", loginUser.getId());
putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
return null;
}

Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
logger.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return null;
}
String tenantCode = tenant.getTenantCode();

String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER");

LocalDateTime now = LocalDateTime.now();
now = now.minus(days, ChronoUnit.DAYS);
String deleteDate = now.toLocalDate().toString().replace("-", "");
List<StorageEntity> storageEntities;
try {
storageEntities = new ArrayList<>(
storageOperate.listFilesStatus(baseFolder, baseFolder, tenantCode, ResourceType.FILE));
} catch (Exception e) {
logger.error("delete data transfer data error", e);
putMsg(result, Status.DELETE_RESOURCE_ERROR);
return result;
}

List<String> successList = new ArrayList<>();
List<String> failList = new ArrayList<>();
for (StorageEntity storageEntity : storageEntities) {
File path = new File(storageEntity.getFullName());
String date = path.getName();
if (date.compareTo(deleteDate) <= 0) {
try {
storageOperate.delete(storageEntity.getFullName(), true);
successList.add(storageEntity.getFullName());
} catch (Exception ex) {
logger.error("delete data transfer data {} error, please delete it manually", date, ex);
failList.add(storageEntity.getFullName());
}
}
}
putMsg(result, Status.SUCCESS);
result.put("successList", successList);
result.put("failList", failList);
return result;
}

/**
* unauthorized file
*
Expand Down
8 changes: 8 additions & 0 deletions dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<htrace.version>4.1.1</htrace.version>
<datasync.version>2.17.282</datasync.version>
<springdoc-openapi-ui.version>1.6.9</springdoc-openapi-ui.version>
<zt-zip.version>1.15</zt-zip.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -754,6 +755,13 @@
<artifactId>springdoc-openapi-ui</artifactId>
<version>${springdoc-openapi-ui.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

</dependencies>
</dependencyManagement>
</project>
1 change: 1 addition & 0 deletions dolphinscheduler-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
sdk-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/sdk-core/2.17.282, Apache 2.0
third-party-jackson-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/third-party-jackson-core/2.17.282, Apache 2.0
utils 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/utils/2.17.282, Apache 2.0
zt-zip 1.15: https://github.com/zeroturnaround/zt-zip/blob/master/LICENSE, Apache 2.0

========================================================================
BSD licenses
Expand Down
Loading

0 comments on commit b27f0a1

Please sign in to comment.