diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index de5ce260027d..bedbbf2f5e1f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -118,3 +118,5 @@ class Symbol(str): SLASH = "/" POINT = "." + COMMA = "," + UNDERLINE = "_" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 26b71c186d16..3fec31fd6745 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -17,6 +17,7 @@ """DolphinScheduler Task and TaskRelation object.""" import copy +import types from logging import getLogger from typing import Dict, List, Optional, Sequence, Set, Tuple, Union @@ -24,6 +25,7 @@ from pydolphinscheduler.constants import ( Delimiter, ResourceKey, + Symbol, TaskFlag, TaskPriority, TaskTimeoutFlag, @@ -114,7 +116,7 @@ class Task(Base): _task_custom_attr: set = set() ext: set = None - ext_attr: str = None + ext_attr: Union[str, types.FunctionType] = None DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} @@ -271,23 +273,26 @@ def get_content(self): """Get the file content according to the resource plugin.""" if self.ext_attr is None and self.ext is None: return - _ext_attr = getattr(self, self.ext_attr) - if _ext_attr is not None: - if _ext_attr.endswith(tuple(self.ext)): + if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)): res = self.get_plugin() content = res.read_file(_ext_attr) - setattr(self, self.ext_attr.lstrip("_"), content) + setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content) else: - index = _ext_attr.rfind(".") - if index != -1: - raise ValueError( - "This task does not support files with suffix {}, only supports {}".format( - _ext_attr[index:], ",".join(str(suf) for suf in self.ext) + if self.resource_plugin is not None or ( + self.process_definition is not None + and self.process_definition.resource_plugin is not None + ): + index = _ext_attr.rfind(Symbol.POINT) + if index != -1: + raise ValueError( + "This task does not support files with suffix {}, only supports {}".format( + _ext_attr[index:], + Symbol.COMMA.join(str(suf) for suf in self.ext), + ) ) - ) - setattr(self, self.ext_attr.lstrip("_"), _ext_attr) + setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr) def __hash__(self): return hash(self.code) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py index f881a67de923..945f7824e4ad 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py @@ -34,6 +34,9 @@ class CustomDataX(Task): _task_custom_attr = {"custom_config", "json", "xms", "xmx"} + ext: set = {".json"} + ext_attr: str = "_json" + def __init__( self, name: str, @@ -43,9 +46,9 @@ def __init__( *args, **kwargs ): + self._json = json super().__init__(name, TaskType.DATAX, *args, **kwargs) self.custom_config = self.CUSTOM_CONFIG - self.json = json self.xms = xms self.xmx = xmx @@ -76,6 +79,9 @@ class DataX(Task): "xmx", } + ext: set = {".sql"} + ext_attr: str = "_sql" + def __init__( self, name: str, @@ -92,8 +98,8 @@ def __init__( *args, **kwargs ): + self._sql = sql super().__init__(name, TaskType.DATAX, *args, **kwargs) - self.sql = sql self.custom_config = self.CUSTOM_CONFIG self.datasource_name = datasource_name self.datatarget_name = datatarget_name diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py index 52903d48d93b..de16a9931c5d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py @@ -55,15 +55,16 @@ def foo(): want to execute. """ - _task_custom_attr = { - "raw_script", - } + _task_custom_attr = {"raw_script", "definition"} + + ext: set = {".py"} + ext_attr: Union[str, types.FunctionType] = "_definition" def __init__( self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs ): + self._definition = definition super().__init__(name, TaskType.PYTHON, *args, **kwargs) - self.definition = definition def _build_exe_str(self) -> str: """Build executable string from given definition. @@ -71,32 +72,34 @@ def _build_exe_str(self) -> str: Attribute ``self.definition`` almost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too. """ - if isinstance(self.definition, types.FunctionType): - py_function = inspect.getsource(self.definition) - func_str = f"{py_function}{self.definition.__name__}()" + definition = getattr(self, "definition") + if isinstance(definition, types.FunctionType): + py_function = inspect.getsource(definition) + func_str = f"{py_function}{definition.__name__}()" else: pattern = re.compile("^def (\\w+)\\(") - find = pattern.findall(self.definition) + find = pattern.findall(definition) if not find: log.warning( "Python definition is simple script instead of function, with value %s", - self.definition, + definition, ) - return self.definition + return definition # Keep function str and function callable always have one blank line func_str = ( - f"{self.definition}{find[0]}()" - if self.definition.endswith("\n") - else f"{self.definition}\n{find[0]}()" + f"{definition}{find[0]}()" + if definition.endswith("\n") + else f"{definition}\n{find[0]}()" ) return func_str @property def raw_script(self) -> str: """Get python task define attribute `raw_script`.""" - if isinstance(self.definition, (str, types.FunctionType)): + if isinstance(getattr(self, "definition"), (str, types.FunctionType)): return self._build_exe_str() else: raise PyDSParamException( - "Parameter definition do not support % for now.", type(self.definition) + "Parameter definition do not support % for now.", + type(getattr(self, "definition")), ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py index 716a024daf01..4bebf8379da6 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py @@ -59,6 +59,9 @@ class Sql(Task): "display_rows", } + ext: set = {".sql"} + ext_attr: str = "_sql" + def __init__( self, name: str, @@ -71,8 +74,8 @@ def __init__( *args, **kwargs ): + self._sql = sql super().__init__(name, TaskType.SQL, *args, **kwargs) - self.sql = sql self.param_sql_type = sql_type self.datasource_name = datasource_name self.pre_statements = pre_statements or [] @@ -101,7 +104,7 @@ def sql_type(self) -> str: "|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*" ) pattern_select = re.compile(pattern_select_str, re.IGNORECASE) - if pattern_select.match(self.sql) is None: + if pattern_select.match(self._sql) is None: return SqlType.NOT_SELECT else: return SqlType.SELECT diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py index 5d1890e83d07..95f65b315512 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py @@ -16,12 +16,28 @@ # under the License. """Test Task DataX.""" - +from pathlib import Path from unittest.mock import patch import pytest +from pydolphinscheduler.resources_plugin import Local from pydolphinscheduler.tasks.datax import CustomDataX, DataX +from pydolphinscheduler.utils import file +from tests.testing.file import delete_file + + +@pytest.fixture() +def setup_crt_first(request): + """Set up and teardown about create file first and then delete it.""" + file_content = request.param.get("file_content") + file_path = request.param.get("file_path") + file.write( + content=file_content, + to_path=file_path, + ) + yield + delete_file(file_path) @patch( @@ -122,3 +138,76 @@ def test_custom_datax_get_define(json_template): ): task = CustomDataX(name, json_template) assert task.get_define() == expect + + +@pytest.mark.parametrize( + "setup_crt_first", + [ + { + "file_path": Path(__file__).parent.joinpath("local_res.sql"), + "file_content": "test local resource", + } + ], + indirect=True, +) +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "task_datax", + "datasource_name": "first_mysql", + "datatarget_name": "second_mysql", + "sql": "local_res.sql", + "target_table": "target_table", + "resource_plugin": Local(str(Path(__file__).parent)), + }, + "test local resource", + ), + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_resources_local_datax_command_content( + mock_code_version, attr, expect, setup_crt_first +): + """Test task datax sql content through the local resource plug-in.""" + datax = DataX(**attr) + assert expect == getattr(datax, "sql") + + +@pytest.mark.parametrize( + "setup_crt_first", + [ + { + "file_path": Path(__file__).parent.joinpath("local_res.json"), + "file_content": '{content: "test local resource"}', + } + ], + indirect=True, +) +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "task_custom_datax", + "json": "local_res.json", + "resource_plugin": Local(str(Path(__file__).parent)), + }, + '{content: "test local resource"}', + ), + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_resources_local_custom_datax_command_content( + mock_code_version, attr, expect, setup_crt_first +): + """Test task CustomDataX json content through the local resource plug-in.""" + custom_datax = CustomDataX(**attr) + assert expect == getattr(custom_datax, "json") diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py index e8f7f10d773c..77aa10625bc1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -16,26 +16,42 @@ # under the License. """Test Task python.""" - - +from pathlib import Path from unittest.mock import patch import pytest from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.resources_plugin import Local from pydolphinscheduler.tasks.python import Python +from pydolphinscheduler.utils import file +from tests.testing.file import delete_file def foo(): # noqa: D103 print("hello world.") +@pytest.fixture() +def setup_crt_first(request): + """Set up and teardown about create file first and then delete it.""" + file_content = request.param.get("file_content") + file_path = request.param.get("file_path") + file.write( + content=file_content, + to_path=file_path, + ) + yield + delete_file(file_path) + + @pytest.mark.parametrize( "attr, expect", [ ( {"definition": "print(1)"}, { + "definition": "print(1)", "rawScript": "print(1)", "localParams": [], "resourceList": [], @@ -47,6 +63,7 @@ def foo(): # noqa: D103 ( {"definition": "def foo():\n print('I am foo')"}, { + "definition": "def foo():\n print('I am foo')", "rawScript": "def foo():\n print('I am foo')\nfoo()", "localParams": [], "resourceList": [], @@ -58,6 +75,7 @@ def foo(): # noqa: D103 ( {"definition": foo}, { + "definition": foo, "rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()', "localParams": [], "resourceList": [], @@ -122,6 +140,7 @@ def test_python_get_define(name, script_code, raw): "delayTime": 0, "taskType": "PYTHON", "taskParams": { + "definition": script_code, "resourceList": [], "localParams": [], "rawScript": raw, @@ -145,3 +164,38 @@ def test_python_get_define(name, script_code, raw): ): shell = Python(name, script_code) assert shell.get_define() == expect + + +@pytest.mark.parametrize( + "setup_crt_first", + [ + { + "file_path": Path(__file__).parent.joinpath("local_res.py"), + "file_content": "test local resource", + } + ], + indirect=True, +) +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "task_python", + "definition": "local_res.py", + "resource_plugin": Local(str(Path(__file__).parent)), + }, + "test local resource", + ), + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_resources_local_python_command_content( + mock_code_version, attr, expect, setup_crt_first +): + """Test task Python definition content through the local resource plug-in.""" + python = Python(**attr) + assert expect == getattr(python, "definition") diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py index ba9daa9b2d36..a22d9206d0b7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py @@ -16,13 +16,28 @@ # under the License. """Test Task Sql.""" - - +from pathlib import Path from unittest.mock import patch import pytest +from pydolphinscheduler.resources_plugin import Local from pydolphinscheduler.tasks.sql import Sql, SqlType +from pydolphinscheduler.utils import file +from tests.testing.file import delete_file + +file_name = "local_res.sql" +file_content = "select 1" +res_plugin_prefix = Path(__file__).parent +file_path = res_plugin_prefix.joinpath(file_name) + + +@pytest.fixture +def setup_crt_first(): + """Set up and teardown about create file first and then delete it.""" + file.write(content=file_content, to_path=file_path) + yield + delete_file(file_path) @pytest.mark.parametrize( @@ -165,3 +180,29 @@ def test_sql_get_define(mock_datasource): ): task = Sql(name, datasource_name, command) assert task.get_define() == expect + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "test-sql-local-res", + "sql": file_name, + "datasource_name": "test_datasource", + "resource_plugin": Local(str(res_plugin_prefix)), + }, + file_content, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_resources_local_sql_command_content( + mock_code_version, attr, expect, setup_crt_first +): + """Test sql content through the local resource plug-in.""" + sql = Sql(**attr) + assert expect == getattr(sql, "sql")