Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Api] Resource files need to be checked before submitting #13887

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public enum Status {
RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),

REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),
TASK_RESOURCE_NOT_EXIST(20020, "Task {0} contains removed resource", "任务[{0}]含有被删除的资源文件"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to return the resource doesn't exist


USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.resource;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import org.apache.commons.collections.CollectionUtils;

import java.util.Arrays;
import java.util.List;

import lombok.Getter;
import lombok.Setter;

import org.slf4j.Logger;

import com.google.common.base.Strings;

@Getter
@Setter
public class ResourceCheck {

/**
* logger
*/
private Logger logger;
/**
* Resource Type
*/
private ResourceType resourceType;

/**
* Process Service
*/
private ProcessService processService;

/**
* need check resourceIds
*/
private String resourceIds;

/**
* task name
*/
private String taskName;

/**
* resource exist check
* @param resourceType resource type
* @param processService process service
* @param resourceIds resource ids string with , combine
* @param taskName task name
* @param logger logger
*/
public ResourceCheck(ResourceType resourceType, ProcessService processService, String resourceIds, String taskName,
Logger logger) {
this.resourceType = resourceType;
this.processService = processService;
this.resourceIds = resourceIds;
this.taskName = taskName;
this.logger = logger;
}

/**
* check all resources exist,
* if contains removed resource throws ServiceException
*/
public void checkAllExist() throws ServiceException {
switch (resourceType) {
case FILE:
if (Strings.isNullOrEmpty(this.resourceIds)) {
logger.error("The given task definition has null resources str, taskName: {}", this.taskName);
return;
}

Integer[] resourceIdArray =
Arrays.stream(this.resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);

if (resourceIdArray.length > 0) {
List<Resource> list = processService.listResourceByIds(resourceIdArray);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have refactored the resource center in #12076, ResourceMapper has been deprecated. I think we should not do this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have refactored the resource center in #12076, ResourceMapper has been deprecated. I think we should not do this way.

Add this pre-check and throw an explicit error msg during task execution, which one is better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this pre-check and throw an explicit error msg during task execution, which one is better?

If the user deletes files in third-party storage after passing the pre check, it also cannot avoid runtime errors...

Copy link
Member Author

@qingwli qingwli Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our scenario, we are currently using the 3.1.1 version and I don't know we have refactored about resource center in dev.

And for the 3.1.1 version, we encountered this situation.

Here are the prerequisites:

  1. User can't access third-party storage directly.
  2. User only can operate third-party by ds platform.
  3. We can't delete the resource that binds online released jobs.

process -> task job -> resource table -> third-party storage

And if a process is maintained by a team. One member A operates offline this process. Another member B can delete resource binds in this process by ds. Member A re-online this process and got successful. Will throw an exception during runtime. And If we add a pre-check will find this exception when online.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I agree if the user deletes a third-party storage file directly and we can do nothing about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code only takes effect in version 3.1.X. And I'm +0 on this. cc @ruanwenjun @zhongjiajie @EricGao888 @caishunfeng

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can post this change to 3.1.6, not dev. Because dev is totally different.

if (CollectionUtils.isEmpty(list) || list.size() != resourceIdArray.length) {
logger.error(
"The given task definition has deleted resources, taskName: {}, resourceIds: {}",
this.taskName, this.resourceIds);
throw new ServiceException(
Status.TASK_RESOURCE_NOT_EXIST, this.taskName);
}
}
break;
default:
logger.error("Error resourceType: {}", this.resourceType);
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, this.resourceType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.resource.ResourceCheck;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
Expand Down Expand Up @@ -94,6 +95,7 @@
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -235,6 +237,20 @@ public Map<String, Object> execProcessInstance(User loginUser, long projectCode,
} else {
processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
}

// check if all resources exist
List<TaskDefinition> allDefinitionList =
taskDefinitionMapper.queryAllDefinitionList(processDefinition.getProjectCode());
for (TaskDefinition taskDefinition : allDefinitionList) {
try {
new ResourceCheck(ResourceType.FILE, processService, taskDefinition.getResourceIds(),
taskDefinition.getName(), log).checkAllExist();
} catch (ServiceException e) {
putMsg(result, Status.TASK_RESOURCE_NOT_EXIST, taskDefinition.getName());
return result;
}
}

// check process define release state
this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode,
processDefinition.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.resource.ResourceCheck;
import org.apache.dolphinscheduler.api.service.MetricsCleanUpService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
Expand Down Expand Up @@ -124,6 +125,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -307,6 +309,18 @@ public Map<String, Object> createProcessDefinition(User loginUser,
}
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);

// check if all resources exist
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
try {
new ResourceCheck(ResourceType.FILE, processService, processService.getResourceIds(taskDefinitionLog),
taskDefinitionLog.getName(), log).checkAllExist();
} catch (ServiceException e) {
putMsg(result, Status.TASK_RESOURCE_NOT_EXIST, taskDefinitionLog.getName());
return result;
}
}

int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
Expand Down Expand Up @@ -1168,6 +1182,15 @@ public Map<String, Object> releaseProcessDefinition(User loginUser, long project
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}

// Check if all resources exist
DagData dagData = processService.genDagData(processDefinition);
List<TaskDefinition> taskDefinitionList = dagData.getTaskDefinitionList();
for (TaskDefinition taskDefinition : taskDefinitionList) {
new ResourceCheck(ResourceType.FILE, processService,
taskDefinition.getResourceIds(), taskDefinition.getName(), log).checkAllExist();
}

processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
log.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.PermissionCheck;
import org.apache.dolphinscheduler.api.resource.ResourceCheck;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -766,6 +768,16 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);

// check if all resources exist
try {
new ResourceCheck(ResourceType.FILE, processService,
taskDefinitionToUpdate.getResourceIds(), taskDefinitionToUpdate.getName(), log).checkAllExist();
} catch (ServiceException e) {
putMsg(result, Status.TASK_RESOURCE_NOT_EXIST, taskDefinitionToUpdate.getName());
return null;
}

int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
taskDefinitionToUpdate.setOperator(loginUser.getId());
taskDefinitionToUpdate.setOperateTime(now);
Expand Down Expand Up @@ -1290,6 +1302,15 @@ public Map<String, Object> releaseTaskDefinition(User loginUser, long projectCod
putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION);
return result;
}

// check if all resources exist
try {
new ResourceCheck(ResourceType.FILE, processService,
taskDefinition.getResourceIds(), taskDefinition.getName(), log).checkAllExist();
} catch (ServiceException e) {
putMsg(result, Status.TASK_RESOURCE_NOT_EXIST, taskDefinition.getName());
return result;
}
}
taskDefinition.setFlag(Flag.YES);
taskDefinitionLog.setFlag(Flag.YES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.resource;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* resource check test
*/
@RunWith(MockitoJUnitRunner.class)
public class ResourceCheckTest {

private static final Logger logger = LoggerFactory.getLogger(ResourceCheckTest.class);

@Mock
private ProcessService processService;

@Test
public void testResourceCheckNull() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setResourceIds(null);
taskDefinition.setName("taskDefinition");

new ResourceCheck(ResourceType.FILE, processService, taskDefinition.getResourceIds(),
taskDefinition.getName(), logger).checkAllExist();
}

@Test
public void testResourceCheckNotExist() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setResourceIds("1");
taskDefinition.setName("taskDefinition");
String error = null;

try {
new ResourceCheck(ResourceType.FILE, processService, taskDefinition.getResourceIds(),
taskDefinition.getName(), logger).checkAllExist();
} catch (ServiceException e) {
error = e.getMessage();
}

Assertions.assertEquals(error,
MessageFormat.format(Status.TASK_RESOURCE_NOT_EXIST.getMsg(), taskDefinition.getName()));
}

@Test
public void testResourceCheckExist() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setResourceIds("1");
taskDefinition.setName("taskDefinition");
String error = null;

Integer[] resourceArr = {1};
List<Resource> list = new ArrayList<>();
list.add(new Resource());

Mockito.when(processService.listResourceByIds(resourceArr)).thenReturn(list);

try {
new ResourceCheck(ResourceType.FILE, processService, taskDefinition.getResourceIds(),
taskDefinition.getName(), logger).checkAllExist();
} catch (ServiceException e) {
error = e.getMessage();
}

Assertions.assertNull(error);
}

@Test
public void testResourceCheckUDF() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setResourceIds(null);
taskDefinition.setName("taskDefinition");
String error = null;

try {
new ResourceCheck(ResourceType.UDF, processService, taskDefinition.getResourceIds(),
taskDefinition.getName(), logger).checkAllExist();
} catch (ServiceException e) {
error = e.getMessage();
}

Assertions.assertEquals(error,
MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), ResourceType.UDF));
}
}