Skip to content

Commit

Permalink
[python] Add resource plugin for python, dataX, CustomDataX and Sql (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
xdu-chenrj authored and Chris-Arith committed Oct 26, 2022
1 parent a0c6c66 commit 0ccb0bf
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ class Symbol(str):

SLASH = "/"
POINT = "."
COMMA = ","
UNDERLINE = "_"
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

"""DolphinScheduler Task and TaskRelation object."""
import copy
import types
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union

from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
Delimiter,
ResourceKey,
Symbol,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
Expand Down Expand Up @@ -114,7 +116,7 @@ class Task(Base):
_task_custom_attr: set = set()

ext: set = None
ext_attr: str = None
ext_attr: Union[str, types.FunctionType] = None

DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}

Expand Down Expand Up @@ -271,23 +273,26 @@ def get_content(self):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return

_ext_attr = getattr(self, self.ext_attr)

if _ext_attr is not None:
if _ext_attr.endswith(tuple(self.ext)):
if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
setattr(self, self.ext_attr.lstrip("_"), content)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content)
else:
index = _ext_attr.rfind(".")
if index != -1:
raise ValueError(
"This task does not support files with suffix {}, only supports {}".format(
_ext_attr[index:], ",".join(str(suf) for suf in self.ext)
if self.resource_plugin is not None or (
self.process_definition is not None
and self.process_definition.resource_plugin is not None
):
index = _ext_attr.rfind(Symbol.POINT)
if index != -1:
raise ValueError(
"This task does not support files with suffix {}, only supports {}".format(
_ext_attr[index:],
Symbol.COMMA.join(str(suf) for suf in self.ext),
)
)
)
setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr)

def __hash__(self):
return hash(self.code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class CustomDataX(Task):

_task_custom_attr = {"custom_config", "json", "xms", "xmx"}

ext: set = {".json"}
ext_attr: str = "_json"

def __init__(
self,
name: str,
Expand All @@ -43,9 +46,9 @@ def __init__(
*args,
**kwargs
):
self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.json = json
self.xms = xms
self.xmx = xmx

Expand Down Expand Up @@ -76,6 +79,9 @@ class DataX(Task):
"xmx",
}

ext: set = {".sql"}
ext_attr: str = "_sql"

def __init__(
self,
name: str,
Expand All @@ -92,8 +98,8 @@ def __init__(
*args,
**kwargs
):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.sql = sql
self.custom_config = self.CUSTOM_CONFIG
self.datasource_name = datasource_name
self.datatarget_name = datatarget_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,51 @@ def foo():
want to execute.
"""

_task_custom_attr = {
"raw_script",
}
_task_custom_attr = {"raw_script", "definition"}

ext: set = {".py"}
ext_attr: Union[str, types.FunctionType] = "_definition"

def __init__(
self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
):
self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
self.definition = definition

def _build_exe_str(self) -> str:
"""Build executable string from given definition.
Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
"""
if isinstance(self.definition, types.FunctionType):
py_function = inspect.getsource(self.definition)
func_str = f"{py_function}{self.definition.__name__}()"
definition = getattr(self, "definition")
if isinstance(definition, types.FunctionType):
py_function = inspect.getsource(definition)
func_str = f"{py_function}{definition.__name__}()"
else:
pattern = re.compile("^def (\\w+)\\(")
find = pattern.findall(self.definition)
find = pattern.findall(definition)
if not find:
log.warning(
"Python definition is simple script instead of function, with value %s",
self.definition,
definition,
)
return self.definition
return definition
# Keep function str and function callable always have one blank line
func_str = (
f"{self.definition}{find[0]}()"
if self.definition.endswith("\n")
else f"{self.definition}\n{find[0]}()"
f"{definition}{find[0]}()"
if definition.endswith("\n")
else f"{definition}\n{find[0]}()"
)
return func_str

@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
if isinstance(self.definition, (str, types.FunctionType)):
if isinstance(getattr(self, "definition"), (str, types.FunctionType)):
return self._build_exe_str()
else:
raise PyDSParamException(
"Parameter definition do not support % for now.", type(self.definition)
"Parameter definition do not support % for now.",
type(getattr(self, "definition")),
)
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class Sql(Task):
"display_rows",
}

ext: set = {".sql"}
ext_attr: str = "_sql"

def __init__(
self,
name: str,
Expand All @@ -71,8 +74,8 @@ def __init__(
*args,
**kwargs
):
self._sql = sql
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.sql = sql
self.param_sql_type = sql_type
self.datasource_name = datasource_name
self.pre_statements = pre_statements or []
Expand Down Expand Up @@ -101,7 +104,7 @@ def sql_type(self) -> str:
"|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
if pattern_select.match(self.sql) is None:
if pattern_select.match(self._sql) is None:
return SqlType.NOT_SELECT
else:
return SqlType.SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,28 @@
# under the License.

"""Test Task DataX."""

from pathlib import Path
from unittest.mock import patch

import pytest

from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file


@pytest.fixture()
def setup_crt_first(request):
"""Set up and teardown about create file first and then delete it."""
file_content = request.param.get("file_content")
file_path = request.param.get("file_path")
file.write(
content=file_content,
to_path=file_path,
)
yield
delete_file(file_path)


@patch(
Expand Down Expand Up @@ -122,3 +138,76 @@ def test_custom_datax_get_define(json_template):
):
task = CustomDataX(name, json_template)
assert task.get_define() == expect


@pytest.mark.parametrize(
"setup_crt_first",
[
{
"file_path": Path(__file__).parent.joinpath("local_res.sql"),
"file_content": "test local resource",
}
],
indirect=True,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "task_datax",
"datasource_name": "first_mysql",
"datatarget_name": "second_mysql",
"sql": "local_res.sql",
"target_table": "target_table",
"resource_plugin": Local(str(Path(__file__).parent)),
},
"test local resource",
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_datax_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task datax sql content through the local resource plug-in."""
datax = DataX(**attr)
assert expect == getattr(datax, "sql")


@pytest.mark.parametrize(
"setup_crt_first",
[
{
"file_path": Path(__file__).parent.joinpath("local_res.json"),
"file_content": '{content: "test local resource"}',
}
],
indirect=True,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "task_custom_datax",
"json": "local_res.json",
"resource_plugin": Local(str(Path(__file__).parent)),
},
'{content: "test local resource"}',
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_custom_datax_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task CustomDataX json content through the local resource plug-in."""
custom_datax = CustomDataX(**attr)
assert expect == getattr(custom_datax, "json")
Loading

0 comments on commit 0ccb0bf

Please sign in to comment.