Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Task] Transfer files between tasks #12552

Merged
merged 5 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions docs/docs/en/guide/parameter/file-parameter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# FILE Parameter
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved

Use the file parameter to pass files (or folders, hereinafter referred to as **files**) in the working directory of the upstream task to the downstream task in the same workflow instance. The following scenarios may be used

- In the ETL task, pass the data files processed by multiple upstream tasks to a specific downstream task.
- In the machine learning scenario, pass the data set file of the upstream data preparation task to the downstream model training task.

## Usage

### Configure file parameter

File parameter configuration method: click the plus sign on the right side of "Custom Parameters" on the task definition page to configure.

### Output file to downstream task

**Four options of custom parameters are:**

- Parameter name: the identifier used when passing tasks, such as `KEY1` and `KEY2` in the figure below
- Direction: OUT, which means outputting the file to the downstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: output file path, such as `data` and `data/test2/text.txt` in the figure below

The configuration in the figure below indicates that the `output` task passes two file data to the downstream task, respectively:

- Pass out the folder `data`, and mark it as `dir-data`. The downstream task can get this folder through `output.dir-data`
- Pass out the file `data/test2/text.txt`, and mark it as `file-text`. The downstream task can get this folder through `output.file-text`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)

### Get the file from the upstream task

**Four options of custom parameters are:**

- Parameter name: the position where the upstream file is saved after input, such as `input_dir` used in the figure below
- Direction: IN, which means to get the file from the upstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: the identifier of the upstream file, in the format of `taskName.KEY`. For example, `output.dir-data` in the figure below, where `output` is the name of the upstream task, and `dir-data` is the file identifier output by the upstream task

The configuration in the figure below indicates that the task gets the folder identified by `dir-data` from the upstream task `output` and saves it as `input_dir`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)

The configuration in the figure below indicates that the task gets the file identified by `file-text` from the upstream task `output` and saves it as `input.txt`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)

## Other

### Note

- The file transfer between upstream and downstream tasks is based on the resource center as a transfer, and the data is saved in the `DATA_TRANSFER` directory of the resource center. Therefore, **the resource center function must be enabled**, please refer to [Resource Center Configuration Details](../resource/configuration.md) for details, otherwise the file parameter function cannot be used.
- The file naming rule is `DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName`
- If the transferred file data is a folder, it will be packaged into a compressed file with a suffix of `.zip` and uploaded. The downstream task will unzip and save it in the corresponding directory after receiving it
- If you need to delete the file data, you can delete the corresponding folder in the `DATA_TRANSFER` directory of the resource center. If you delete the date subdirectory directly, all the file data under that date will be deleted. You can also use the [Open API interface](../open-api.md) (`resources/data-transfer`) to delete the corresponding file data (delete data N days ago).
- If there is a task chain task1->task2->tas3, then the downstream task task3 can also get the file data of task1
- Support one-to-many transmission and many-to-one transmission
- If you frequently transfer a large number of files, it is obvious that the system IO performance will be affected by the amount of transferred data

### Example

You can save the following YAML file locally and then execute `pydolphinscheduler yaml -f data-transfer.yaml` to run the Demo.

```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true

# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }

- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }


- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```

101 changes: 101 additions & 0 deletions docs/docs/zh/guide/parameter/file-parameter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# 文件参数

通过配置文件参数,在同一工作流实例中,可以将上游任务工作目录下的文件(或文件夹,下统一以**文件**代替)传递给下游任务。 如以下场景可能使用到

- 在ETL任务中,将多个上游任务处理好的数据文件一起传递给特定的下游任务。
- 在机器学习场景中,将上游数据准备任务的数据集文件传递给下游模型训练任务。

## 使用方式

### 配置文件参数

文件参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,即可进行配置。

### 输出文件给下游任务

**自定义参数四个选项分别为:**

- 参数名:任务间传递时使用的标识,如下图中使用的`KEY1`和`KEY2`
- 方向:OUT, 则表示输出文件给下游任务
- 参数类型:FILE, 表示文件参数
- 参数值:输出的文件路径,如下图中的`data`和`data/test2/text.txt`

下图的配置表示任务`output`向下游任务传递两个文件数据,分别为:
- 传出文件夹 `data`, 并标记为`dir-data`, 下游任务可以通过`output.dir-data`获取该文件夹
- 传出文件 `data/test2/text.txt`, 并标记为`file-text`, 下游任务可以通过`output.file-text`获取该文件夹

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)

### 获取上游任务的文件

**自定义参数四个选项分别为:**

- 参数名:上游文件输入后保存的位置,如下图中使用的`input_dir`
- 方向:IN, 则表示从上游任务获取文件
- 参数类型:FILE, 表示文件参数
- 参数值:上游文件的标识,为 `taskName.KEY` 的格式 如下图中的`output.dir-data`, 其中`output`为上游任务的名称,`dir-data`为上游任务中输出的文件标识

下图的配置表示任务从上游任务`output`中获取标识为`dir-data`的文件夹,并保存为`input_dir`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)

下图的配置表示任务从上游任务`output`中获取标识为`file-text`的文件,并保存为`input.txt`

![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)

## 其他

### 备注

- 上下游任务间的文件传递基于资源中心作为中转,数据保存在资源中心`DATA_TRANSFER`的目录下, 因此**必须开启资源中心功能**,详情请参考[资源中心配置详情](../resource/configuration.md), 否则无法使用文件参数功能。
- 文件命名规则为 `DATA_TRANSFER/日期/工作流Code/工作流版本_工作流实例ID/任务名称_任务实例ID_文件名`
- 若传输的文件数据为文件夹,则会打包成后缀为`.zip`的压缩文件再上传,下游任务接到后会解压并保存在对应目录
- 若需要删除文件数据,可以在资源中心的`DATA_TRANSFER`目录下删除对应文件夹即可, 如直接按照日期子目录删除,会删除该日期下所有的文件数据. 也可以使用`resources/data-transfer`[Open API 接口](../open-api.md)(删除N天前的数据)删除对应文件数据。
- 如果存在任务链 task1->task2->tas3, 则最下游任务task3也能获取task1的文件数据
- 支持一对多传输以及多对一传输
- 如果频繁大量传输文件,毫无疑问会因传输的数据量影响到系统IO性能

### 样例

你可以保存以下YAML文件到本地,然后执行`pydolphinscheduler yaml -f data-transfer.yaml`即可运行Demo.

```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true

# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }

- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }


- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.dolphinscheduler.api.enums.Status.VIEW_UDF_FUNCTION_ERROR;

import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UdfFuncService;
Expand Down Expand Up @@ -270,6 +271,25 @@ public Result<Object> deleteResource(@Parameter(hidden = true) @RequestAttribute
return resourceService.delete(loginUser, fullName, tenantCode);
}

/**
* delete DATA_TRANSFER data
*
* @param loginUser login user
* @return delete result code
*/
@Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ")
@Parameters({
@Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = Integer.class))
})
@DeleteMapping(value = "/data-transfer")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_RESOURCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public DeleteDataTransferResponse deleteDataTransferData(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "days") Integer days) {
return resourceService.deleteDataTransferData(loginUser, days);
}

/**
* verify resource by alias and type
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.dto.resources;

import org.apache.dolphinscheduler.api.utils.Result;

import java.util.List;

import lombok.Data;

@Data
public class DeleteDataTransferResponse extends Result {

private List<String> successList;

private List<String> failedList;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.Resource;
Expand Down Expand Up @@ -230,6 +231,14 @@ Result<Object> updateResourceContent(User loginUser, String fullName, String ten
*/
Resource queryResourcesFileInfo(String userName, String fullName);

/**
* delete DATA_TRANSFER data in resource center
*
* @param loginUser user who query resource
* @param days number of days
*/
DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days);

/**
* unauthorized file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.dolphinscheduler.common.constants.Constants.JAR;
import static org.apache.dolphinscheduler.common.constants.Constants.PERIOD;

import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
Expand Down Expand Up @@ -71,9 +72,12 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.rmi.ServerException;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1871,6 +1875,64 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
return (Resource) resourceResponse.getData();
}

@Override
public DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days) {
DeleteDataTransferResponse result = new DeleteDataTransferResponse();

User user = userMapper.selectById(loginUser.getId());
if (user == null) {
logger.error("user {} not exists", loginUser.getId());
putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
return result;
}

Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
logger.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}

String tenantCode = tenant.getTenantCode();

String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER");

LocalDateTime now = LocalDateTime.now();
now = now.minus(days, ChronoUnit.DAYS);
String deleteDate = now.toLocalDate().toString().replace("-", "");
List<StorageEntity> storageEntities;
try {
storageEntities = new ArrayList<>(
storageOperate.listFilesStatus(baseFolder, baseFolder, tenantCode, ResourceType.FILE));
} catch (Exception e) {
logger.error("delete data transfer data error", e);
putMsg(result, Status.DELETE_RESOURCE_ERROR);
return result;
}

List<String> successList = new ArrayList<>();
List<String> failList = new ArrayList<>();

for (StorageEntity storageEntity : storageEntities) {
jieguangzhou marked this conversation as resolved.
Show resolved Hide resolved
File path = new File(storageEntity.getFullName());
String date = path.getName();
if (date.compareTo(deleteDate) <= 0) {
try {
storageOperate.delete(storageEntity.getFullName(), true);
successList.add(storageEntity.getFullName());
} catch (Exception ex) {
logger.error("delete data transfer data {} error, please delete it manually", date, ex);
failList.add(storageEntity.getFullName());
}
}
}

result.setSuccessList(successList);
result.setFailedList(failList);
putMsg(result, Status.SUCCESS);
return result;
}

/**
* unauthorized file
*
Expand Down
Loading