Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Task Plugin] Increase zeppelin task stability in production #11010

Merged
merged 6 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/docs/en/guide/task/zeppelin.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,21 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
| Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
| Zeppelin Production Note Directory | The directory for cloned note in production mode. |
| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
| Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |

## Production (Clone) Mode

- Fill in the optional `Zeppelin Production Note Directory` parameter to enable `Production Mode`.
- In `Production Mode`, the target note gets copied to the `Zeppelin Production Note Directory` you choose.
`Zeppelin Task Plugin` will execute the cloned note instead of the original one. Once execution done,
`Zeppelin Task Plugin` will delete the cloned note automatically.
Therefore, it increases the stability as the modification to a running note triggered by `Dolphin Scheduler`
will not affect the production task.
- If you leave the `Zeppelin Production Note Directory` empty, `Zeppelin Task Plugin` will execute the original note.
- 'Zeppelin Production Note Directory' should both start and end with a `slash`. e.g. `/production_note_directory/`

## Task Example

### Zeppelin Paragraph Task Example
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/zh/guide/task/zeppelin.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,19 @@
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
- Zeppelin Production Note Directory:生产模式下存放克隆note的目录。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。

## 生产(克隆)模式

- 填上`Zeppelin Production Note Directory`参数以启动`生产模式`。
- 在`生产模式`下,目标note会被克隆到您所填的`Zeppelin Production Note Directory`目录下。
`Zeppelin任务插件`将会执行克隆出来的note并在执行成功后自动清除它。
因为在此模式下,如果您不小心修改了正在被`Dolphin Scheduler`调度的note,也不会影响到生产任务的执行,
从而提高了稳定性。
- 如果您选择不填`Zeppelin Production Note Directory`这个参数,`Zeppelin任务插件`将会执行您的原始note。
'Zeppelin Production Note Directory'参数在格式上应该以`斜杠`开头和结尾,例如 `/production_note_directory/`。

## Task Example

### Zeppelin Paragraph Task Example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ZeppelinParameters extends AbstractParameters {
private String noteId;
private String paragraphId;
private String restEndpoint;
private String productionNoteDirectory;
private String parameters;

@Override
Expand Down Expand Up @@ -77,12 +78,21 @@ public void setRestEndpoint(String restEndpoint) {
this.restEndpoint = restEndpoint;
}

public String getproductionNoteDirectory() {
return productionNoteDirectory;
}

public void setproductionNoteDirectory(String productionNoteDirectory) {
this.productionNoteDirectory = productionNoteDirectory;
}

@Override
public String toString() {
return "ZeppelinParameters{"
+ "noteId='" + noteId + '\''
+ ", paragraphId='" + paragraphId + '\''
+ ", paragraphId='" + paragraphId + '\''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help to also refactor this class to use lombok annotations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kezhenxu94 Certainly yes! I'm a lombok fan, lol.

+ ", restEndpoint='" + restEndpoint + '\''
+ ", productionNoteDirectory='" + productionNoteDirectory + '\''
+ ", parameters='" + parameters + '\''
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
Expand Down Expand Up @@ -76,9 +77,11 @@ public void init() {
@Override
public void handle() throws Exception {
try {
final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String productionNoteDirectory = this.zeppelinParameters.getproductionNoteDirectory();
final String parameters = this.zeppelinParameters.getParameters();
// noteId may be replaced with cloned noteId
String noteId = this.zeppelinParameters.getNoteId();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
Expand All @@ -88,6 +91,16 @@ public void handle() throws Exception {
// Submit zeppelin task
String resultContent;
Status status = Status.FINISHED;
// If in production, clone the note and run the cloned one for stability
if (productionNoteDirectory != null) {
final String cloneNotePath = String.format(
"%s%s_%s",
productionNoteDirectory,
noteId,
DateUtils.getTimestampString());
noteId = this.zClient.cloneNote(noteId, cloneNotePath);
}

if (paragraphId == null) {
final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
Expand All @@ -105,13 +118,19 @@ public void handle() throws Exception {
break;
}
}

resultContent = resultContentBuilder.toString();
} else {
final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
resultContent = paragraphResult.getResultInText();
status = paragraphResult.getStatus();
}

// Delete cloned note
if (productionNoteDirectory != null) {
this.zClient.deleteNote(noteId);
}

// Use noteId-paragraph-Id as app id
final int exitStatusCode = mapStatusToExitCode(status);
setAppIds(String.format("%s-%s", noteId, paragraphId));
Expand All @@ -121,6 +140,7 @@ public void handle() throws Exception {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("zeppelin task submit failed with error", e);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;


Expand All @@ -51,9 +52,10 @@

@RunWith(PowerMockRunner.class)
@PrepareForTest({
ZeppelinTask.class,
ZeppelinClient.class,
ObjectMapper.class,
ZeppelinTask.class,
ZeppelinClient.class,
ObjectMapper.class,
DateUtils.class
})
@PowerMockIgnore({"javax.*"})
public class ZeppelinTaskTest {
Expand All @@ -62,6 +64,8 @@ public class ZeppelinTaskTest {
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
private static final String MOCK_REST_ENDPOINT = "localhost:8080";
private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
private final ObjectMapper mapper = new ObjectMapper();

private ZeppelinClient zClient;
Expand Down Expand Up @@ -161,6 +165,37 @@ public void testHandleWithNoteExecutionSuccess() throws Exception {
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
}

@Test
public void testHandleWithNoteExecutionSuccessWithProductionSetting() throws Exception {
String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithProductionSetting();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.mockStatic(DateUtils.class);
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));

// mock zClient and note result
this.zClient = mock(ZeppelinClient.class);
this.noteResult = mock(NoteResult.class);

// use mocked zClient in zeppelinTask
doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
when(this.zClient.cloneNote(any(String.class), any(String.class))).thenReturn(MOCK_CLONE_NOTE_ID);
when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
this.zeppelinTask.init();
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
when(DateUtils.getTimestampString()).thenReturn("123456789");
this.zeppelinTask.handle();
Mockito.verify(this.zClient).cloneNote(
MOCK_NOTE_ID,
String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789"));
Mockito.verify(this.zClient).executeNote(MOCK_CLONE_NOTE_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.noteResult).getParagraphResultList();
Mockito.verify(this.zClient).deleteNote(MOCK_CLONE_NOTE_ID);
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
}

private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
Expand All @@ -179,4 +214,15 @@ private String buildZeppelinTaskParametersWithNoParagraphId() {

return JSONUtils.toJsonString(zeppelinParameters);
}

private String buildZeppelinTaskParametersWithProductionSetting() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setproductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);

return JSONUtils.toJsonString(zeppelinParameters);
}

}
2 changes: 2 additions & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ export default {
zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
zeppelin_production_note_directory: 'Directory for cloned zeppelin note in production mode',
zeppelin_production_note_directory_tips: 'Please enter the production note directory to enable production mode',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',
Expand Down
2 changes: 2 additions & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@ export default {
zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_production_note_directory: '生产模式下存放克隆note的目录',
zeppelin_production_note_directory_tips: '请输入生产环境note目录以启用生产模式',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
}
}
},
{
type: 'input',
field: 'zeppelinProductionNoteDirectory',
name: t('project.node.zeppelin_production_note_directory'),
props: {
placeholder: t('project.node.zeppelin_production_note_directory_tips')
}
},
{
type: 'input',
field: 'parameters',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ export function formatParams(data: INodeData): {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
taskParams.restEndpoint = data.zeppelinRestEndpoint
taskParams.productionNoteDirectory = data.zeppelinProductionNoteDirectory
taskParams.parameters = data.parameters
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ interface ITaskParams {
zeppelinParagraphId?: string
zeppelinRestEndpoint?: string
restEndpoint?: string
zeppelinProductionNoteDirectory?: string
productionNoteDirectory?: string
noteId?: string
paragraphId?: string
condaEnvName?: string
Expand Down