Skip to content

Commit

Permalink
[Python] Supports creating or editing resources. (#10823)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiSandog authored Jul 12, 2022
1 parent 499e5b1 commit 59cd861
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,21 @@ public Map<String, Object> queryResourcesFileInfo(String userName, String fullNa
return result;
}

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
public Integer createOrUpdateResource(
String userName, String fullName, String description, String resourceContent) {
return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent);
}

@PostConstruct
public void init() {
if (pythonGatewayConfiguration.getEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,30 @@ Result<Object> updateResource(User loginUser,
*/
Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory);

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content);

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent);

/**
* updateProcessInstance resource
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@

import static org.apache.dolphinscheduler.common.Constants.ALIAS;
import static org.apache.dolphinscheduler.common.Constants.CONTENT;
import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_SS;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.JAR;
import static org.apache.dolphinscheduler.common.Constants.PERIOD;

/**
* resources service impl
Expand Down Expand Up @@ -1117,6 +1119,96 @@ public Result<Object> onlineCreateResource(User loginUser, ResourceType type, St
return result;
}

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
@Override
@Transactional
public Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content) {
if (checkResourceExists(fileFullName, ResourceType.FILE.ordinal())) {
Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0);
Result<Object> result = this.updateResourceContent(loginUser, resource.getId(), content);
if (result.getCode() == Status.SUCCESS.getCode()) {
resource.setDescription(desc);
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
}
return result;
} else {
String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1);
String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1);
String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING);
String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING);
String[] dirNames = resourceDir.split(FOLDER_SEPARATOR);
int pid = -1;
StringBuilder currDirPath = new StringBuilder();
for (String dirName : dirNames) {
if (StringUtils.isNotEmpty(dirName)) {
pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName);
currDirPath.append(FOLDER_SEPARATOR).append(dirName);
}
}
return this.onlineCreateResource(
loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content, pid, currDirPath.toString());
}
}

@Override
@Transactional
public Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent) {
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
String msg = String.format("The suffix of file can not be empty : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
fullName = FOLDER_SEPARATOR + fullName;
}
Result<Object> createResult = onlineCreateOrUpdateResourceWithDir(
user, fullName, description, resourceContent);
if (createResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
String msg = String.format("Can not create or update resource : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}

private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) {
String dirFullName = currentDir + FOLDER_SEPARATOR + dirName;
if (checkResourceExists(dirFullName, ResourceType.FILE.ordinal())) {
List<Resource> resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal());
return resourceList.get(0).getId();
} else {
// create dir
Result<Object> createDirResult = this.createDirectory(
user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir);
if (createDirResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
return (int) resultMap.get("id");
} else {
String msg = String.format("Can not create dir %s", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
}
}

private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) {
AuthorizationType authorizationType = resourceType.equals(ResourceType.FILE) ? AuthorizationType.RESOURCE_FILE_ID : AuthorizationType.UDF_FILE;
permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ public void testGetDependentInfo() {
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
}

@Test
public void testCreateResource() {
User user = getTestUser();
String resourceDir = "/dir1/dir2/";
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String resourceFullName = resourceDir + resourceName + "." + resourceSuffix;

int resourceId = 3;

Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content))
.thenReturn(resourceId);

int id = pythonGateway.createOrUpdateResource(
user.getUserName(), resourceFullName, desc, content);
Assert.assertEquals(id, resourceId);
}


@Test
public void testQueryResourcesFileInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,64 @@ public void testOnlineCreateResource() {

}

@Test
public void testOnlineCreateResourceWithDir() {
User user = getUser();
user.setId(1);

String dir1Path = "/dir1";
String dir2Path = "/dir2";
String resourceDir = dir1Path + dir2Path;
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix;

Resource dir1 = new Resource();
dir1.setFullName(dir1Path);
dir1.setId(1);
dir1.setUserId(user.getId());
Resource dir2 = new Resource();
dir2.setFullName(resourceDir);
dir2.setUserId(user.getId());
Mockito.when(resourcesMapper.queryResource(dir1.getFullName(), ResourceType.FILE.ordinal())).thenReturn(Collections.singletonList(dir1));
Mockito.when(resourcesMapper.queryResource(resourceDir, ResourceType.FILE.ordinal())).thenReturn(null);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, new Object[]{dir1.getId()}, 1, serviceLogger)).thenReturn(true);

Tenant tenant = getTenant();
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FOLDER_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
try {
PowerMockito.when(storageOperate.mkdir(tenant.getTenantCode(), null)).thenReturn(true);
} catch (IOException e) {
logger.error("storage error", e);
}

PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_RENAME, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(resourcesMapper.selectById(dir1.getId())).thenReturn(dir1);
Mockito.when(resourcesMapper.selectById(dir2.getId())).thenReturn(dir2);
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);

Result<Object> result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content);
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}

@Test
public void testUpdateResourceContent() {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
Expand Down Expand Up @@ -896,7 +954,7 @@ private Resource getResource(int resourceId) {
return resource;
}

private Resource getResource(int resourceId,ResourceType type) {
private Resource getResource(int resourceId, ResourceType type) {

Resource resource = new Resource();
resource.setId(resourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ private Constants() {
*/
public static final String COLON = ":";

/**
* period .
*/
public static final String PERIOD = ".";

/**
* QUESTION ?
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class ProcessDefinition(Base):
thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
:param resource_list: Resource files required by the current process definition.You can create and modify
resource files from this field. When the process definition is submitted, these resource files are also
submitted along with it.
"""

# key attribute for identify ProcessDefinition object
Expand All @@ -88,6 +91,7 @@ class ProcessDefinition(Base):
"tasks",
"task_definition_json",
"task_relation_json",
"resource_list",
}

def __init__(
Expand All @@ -107,6 +111,7 @@ def __init__(
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
resource_list: Optional[List] = None,
):
super().__init__(name, description)
self.schedule = schedule
Expand All @@ -132,6 +137,7 @@ def __init__(
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
self.resource_list = resource_list or []

def __enter__(self) -> "ProcessDefinition":
ProcessDefinitionContext.set(self)
Expand Down Expand Up @@ -407,6 +413,14 @@ def submit(self) -> int:
None,
None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
gateway.entry_point.createOrUpdateResource(
self._user,
res.name,
res.description,
res.content,
)
return self._process_definition_code

def start(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Module resource."""

from typing import Optional

from pydolphinscheduler.core.base import Base


class Resource(Base):
"""resource object, will define the resources that you want to create or update.
:param name: The fullname of resource.Includes path and suffix.
:param content: The description of resource.
:param description: The description of resource.
"""

_DEFINE_ATTR = {"name", "content", "description"}

def __init__(
self,
name: str,
content: str,
description: Optional[str] = None,
):
super().__init__(name, description)
self.content = content
self._resource_code = None
Loading

0 comments on commit 59cd861

Please sign in to comment.