diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md index c88c0b92b81e..8c8de9dae65c 100644 --- a/docs/docs/en/guide/task/zeppelin.md +++ b/docs/docs/en/guide/task/zeppelin.md @@ -26,9 +26,21 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click | Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. | | Zeppelin Note ID | The unique note id for a zeppelin notebook note. | | Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. | +| Zeppelin Production Note Directory | The directory for cloned note in production mode. | | Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server | | Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. | +## Production (Clone) Mode + +- Fill in the optional `Zeppelin Production Note Directory` parameter to enable `Production Mode`. +- In `Production Mode`, the target note gets copied to the `Zeppelin Production Note Directory` you choose. +`Zeppelin Task Plugin` will execute the cloned note instead of the original one. Once execution done, +`Zeppelin Task Plugin` will delete the cloned note automatically. +Therefore, it increases the stability as the modification to a running note triggered by `Dolphin Scheduler` +will not affect the production task. +- If you leave the `Zeppelin Production Note Directory` empty, `Zeppelin Task Plugin` will execute the original note. +- 'Zeppelin Production Note Directory' should both start and end with a `slash`. e.g. `/production_note_directory/` + ## Task Example ### Zeppelin Paragraph Task Example diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md index eb802d5cfb4d..ac3bebc69011 100644 --- a/docs/docs/zh/guide/task/zeppelin.md +++ b/docs/docs/zh/guide/task/zeppelin.md @@ -24,8 +24,19 @@ - Zeppelin Note ID:Zeppelin Note对应的唯一ID。 - Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。 - Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。 +- Zeppelin Production Note Directory:生产模式下存放克隆note的目录。 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。 +## 生产(克隆)模式 + +- 填上`Zeppelin Production Note Directory`参数以启动`生产模式`。 +- 在`生产模式`下,目标note会被克隆到您所填的`Zeppelin Production Note Directory`目录下。 +`Zeppelin任务插件`将会执行克隆出来的note并在执行成功后自动清除它。 +因为在此模式下,如果您不小心修改了正在被`Dolphin Scheduler`调度的note,也不会影响到生产任务的执行, +从而提高了稳定性。 +- 如果您选择不填`Zeppelin Production Note Directory`这个参数,`Zeppelin任务插件`将会执行您的原始note。 +'Zeppelin Production Note Directory'参数在格式上应该以`斜杠`开头和结尾,例如 `/production_note_directory/`。 + ## Task Example ### Zeppelin Paragraph Task Example diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index 0c67e4f69b9f..b95c93971f14 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -24,6 +27,9 @@ import java.util.Collections; import java.util.List; +@Getter +@Setter +@ToString public class ZeppelinParameters extends AbstractParameters { /** @@ -33,6 +39,7 @@ public class ZeppelinParameters extends AbstractParameters { private String noteId; private String paragraphId; private String restEndpoint; + private String productionNoteDirectory; private String parameters; @Override @@ -45,46 +52,4 @@ public List getResourceFilesList() { return Collections.emptyList(); } - public String getNoteId() { - return noteId; - } - - public void setNoteId(String noteId) { - this.noteId = noteId; - } - - public String getParagraphId() { - return paragraphId; - } - - public void setParagraphId(String paragraphId) { - this.paragraphId = paragraphId; - } - - public String getParameters() { - return parameters; - } - - public void setParameters(String parameters) { - this.parameters = parameters; - } - - public String getRestEndpoint() { - return restEndpoint; - } - - public void setRestEndpoint(String restEndpoint) { - this.restEndpoint = restEndpoint; - } - - @Override - public String toString() { - return "ZeppelinParameters{" - + "noteId='" + noteId + '\'' - + ", paragraphId='" + paragraphId + '\'' - + ", restEndpoint='" + restEndpoint + '\'' - + ", parameters='" + parameters + '\'' - + '}'; - } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index ca850dcbbc45..4fe0120e2481 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.NoteResult; @@ -76,9 +77,11 @@ public void init() { @Override public void handle() throws Exception { try { - final String noteId = this.zeppelinParameters.getNoteId(); final String paragraphId = this.zeppelinParameters.getParagraphId(); + final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory(); final String parameters = this.zeppelinParameters.getParameters(); + // noteId may be replaced with cloned noteId + String noteId = this.zeppelinParameters.getNoteId(); Map zeppelinParamsMap = new HashMap<>(); if (parameters != null) { ObjectMapper mapper = new ObjectMapper(); @@ -88,6 +91,16 @@ public void handle() throws Exception { // Submit zeppelin task String resultContent; Status status = Status.FINISHED; + // If in production, clone the note and run the cloned one for stability + if (productionNoteDirectory != null) { + final String cloneNotePath = String.format( + "%s%s_%s", + productionNoteDirectory, + noteId, + DateUtils.getTimestampString()); + noteId = this.zClient.cloneNote(noteId, cloneNotePath); + } + if (paragraphId == null) { final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap); final List paragraphResultList = noteResult.getParagraphResultList(); @@ -105,6 +118,7 @@ public void handle() throws Exception { break; } } + resultContent = resultContentBuilder.toString(); } else { final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); @@ -112,6 +126,11 @@ public void handle() throws Exception { status = paragraphResult.getStatus(); } + // Delete cloned note + if (productionNoteDirectory != null) { + this.zClient.deleteNote(noteId); + } + // Use noteId-paragraph-Id as app id final int exitStatusCode = mapStatusToExitCode(status); setAppIds(String.format("%s-%s", noteId, paragraphId)); @@ -121,6 +140,7 @@ public void handle() throws Exception { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); logger.error("zeppelin task submit failed with error", e); } + } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 4670e87872d9..397d68347e2c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; @@ -51,9 +52,10 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({ - ZeppelinTask.class, - ZeppelinClient.class, - ObjectMapper.class, + ZeppelinTask.class, + ZeppelinClient.class, + ObjectMapper.class, + DateUtils.class }) @PowerMockIgnore({"javax.*"}) public class ZeppelinTaskTest { @@ -62,6 +64,8 @@ public class ZeppelinTaskTest { private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396"; private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}"; private static final String MOCK_REST_ENDPOINT = "localhost:8080"; + private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8"; + private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/"; private final ObjectMapper mapper = new ObjectMapper(); private ZeppelinClient zClient; @@ -161,6 +165,37 @@ public void testHandleWithNoteExecutionSuccess() throws Exception { Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode()); } + @Test + public void testHandleWithNoteExecutionSuccessWithProductionSetting() throws Exception { + String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithProductionSetting(); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + PowerMockito.mockStatic(DateUtils.class); + when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId); + this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext)); + + // mock zClient and note result + this.zClient = mock(ZeppelinClient.class); + this.noteResult = mock(NoteResult.class); + + // use mocked zClient in zeppelinTask + doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient"); + when(this.zClient.cloneNote(any(String.class), any(String.class))).thenReturn(MOCK_CLONE_NOTE_ID); + when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult); + when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result"); + this.zeppelinTask.init(); + when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED); + when(DateUtils.getTimestampString()).thenReturn("123456789"); + this.zeppelinTask.handle(); + Mockito.verify(this.zClient).cloneNote( + MOCK_NOTE_ID, + String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789")); + Mockito.verify(this.zClient).executeNote(MOCK_CLONE_NOTE_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); + Mockito.verify(this.noteResult).getParagraphResultList(); + Mockito.verify(this.zClient).deleteNote(MOCK_CLONE_NOTE_ID); + Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode()); + } + private String buildZeppelinTaskParameters() { ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); zeppelinParameters.setNoteId(MOCK_NOTE_ID); @@ -179,4 +214,15 @@ private String buildZeppelinTaskParametersWithNoParagraphId() { return JSONUtils.toJsonString(zeppelinParameters); } + + private String buildZeppelinTaskParametersWithProductionSetting() { + ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); + zeppelinParameters.setNoteId(MOCK_NOTE_ID); + zeppelinParameters.setParameters(MOCK_PARAMETERS); + zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); + zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY); + + return JSONUtils.toJsonString(zeppelinParameters); + } + } \ No newline at end of file diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 2ab410ed2dfc..dc7257420c8c 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -621,6 +621,8 @@ export default { zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form', zeppelin_rest_endpoint: 'zeppelinRestEndpoint', zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server', + zeppelin_production_note_directory: 'Directory for cloned zeppelin note in production mode', + zeppelin_production_note_directory_tips: 'Please enter the production note directory to enable production mode', jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name_tips: 'Please enter the conda environment name of papermill', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index d6a4758f772f..5be44bf3c24f 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -608,6 +608,8 @@ export default { zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id_tips: '请输入zeppelin note id', zeppelin_paragraph_id: 'zeppelinParagraphId', + zeppelin_production_note_directory: '生产模式下存放克隆note的目录', + zeppelin_production_note_directory_tips: '请输入生产环境note目录以启用生产模式', zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id', zeppelin_parameters: 'parameters', zeppelin_parameters_tips: '请输入zeppelin dynamic form参数', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts index a3077dbd2f85..db16b9bbe1bb 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts @@ -64,6 +64,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] { } } }, + { + type: 'input', + field: 'zeppelinProductionNoteDirectory', + name: t('project.node.zeppelin_production_note_directory'), + props: { + placeholder: t('project.node.zeppelin_production_note_directory_tips') + } + }, { type: 'input', field: 'parameters', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 20209639e5a0..7a3878bc5846 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -330,6 +330,7 @@ export function formatParams(data: INodeData): { taskParams.noteId = data.zeppelinNoteId taskParams.paragraphId = data.zeppelinParagraphId taskParams.restEndpoint = data.zeppelinRestEndpoint + taskParams.productionNoteDirectory = data.zeppelinProductionNoteDirectory taskParams.parameters = data.parameters } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index ced5d4375997..268d8635b8d2 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -300,6 +300,8 @@ interface ITaskParams { zeppelinParagraphId?: string zeppelinRestEndpoint?: string restEndpoint?: string + zeppelinProductionNoteDirectory?: string + productionNoteDirectory?: string noteId?: string paragraphId?: string condaEnvName?: string