Skip to content

Commit

Permalink
[Feature][Task Plugin] Enable users to switch endpoints in zeppelin t…
Browse files Browse the repository at this point in the history
…asks (#10925)

* [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#9814)
  • Loading branch information
EricGao888 authored Jul 15, 2022
1 parent 81930e5 commit a38fa34
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 31 deletions.
3 changes: 0 additions & 3 deletions docs/docs/en/guide/resource/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ development.state=false
# rpc port
alert.rpc.port=50052

# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080

# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh

Expand Down
1 change: 1 addition & 0 deletions docs/docs/en/guide/task/zeppelin.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
| Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
| Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |

## Task Example
Expand Down
1 change: 1 addition & 0 deletions docs/docs/zh/guide/task/zeppelin.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。

## Task Example
Expand Down
3 changes: 0 additions & 3 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ development.state=false
# rpc port
alert.rpc.port=50052

# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080

# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,6 @@ private TaskConstants() {
public static final int LOG_LINES = 500;
public static final String NAMESPACE_NAME = "name";
public static final String CLUSTER = "cluster";
/**
* zeppelin config
*/
public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url";

/**
* conda config used by jupyter task plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class ZeppelinParameters extends AbstractParameters {
*/
private String noteId;
private String paragraphId;
private String restEndpoint;
private String parameters;

@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(this.noteId);
return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.restEndpoint);
}

@Override
Expand Down Expand Up @@ -68,13 +69,22 @@ public void setParameters(String parameters) {
this.parameters = parameters;
}

public String getRestEndpoint() {
return restEndpoint;
}

public void setRestEndpoint(String restEndpoint) {
this.restEndpoint = restEndpoint;
}

@Override
public String toString() {
return "ZeppelinParameters{" +
"noteId='" + noteId + '\'' +
", paragraphId='" + paragraphId + '\'' +
", parameters='" + parameters + '\'' +
'}';
return "ZeppelinParameters{"
+ "noteId='" + noteId + '\''
+ ", paragraphId='" + paragraphId + '\''
+ ", restEndpoint='" + restEndpoint + '\''
+ ", parameters='" + parameters + '\''
+ '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
Expand Down Expand Up @@ -77,9 +76,9 @@ public void init() {
@Override
public void handle() throws Exception {
try {
String noteId = this.zeppelinParameters.getNoteId();
String paragraphId = this.zeppelinParameters.getParagraphId();
String parameters = this.zeppelinParameters.getParameters();
final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String parameters = this.zeppelinParameters.getParameters();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
Expand All @@ -90,8 +89,8 @@ public void handle() throws Exception {
String resultContent;
Status status = Status.FINISHED;
if (paragraphId == null) {
NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
StringBuilder resultContentBuilder = new StringBuilder();
for (ParagraphResult paragraphResult : paragraphResultList) {
resultContentBuilder.append(
Expand All @@ -108,7 +107,7 @@ public void handle() throws Exception {
}
resultContent = resultContentBuilder.toString();
} else {
ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
resultContent = paragraphResult.getResultInText();
status = paragraphResult.getStatus();
}
Expand All @@ -130,12 +129,12 @@ public void handle() throws Exception {
* @return ZeppelinClient
*/
private ZeppelinClient getZeppelinClient() {
final String zeppelinRestUrl = PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL);
ClientConfig clientConfig = new ClientConfig(zeppelinRestUrl);
final String restEndpoint = zeppelinParameters.getRestEndpoint();
final ClientConfig clientConfig = new ClientConfig(restEndpoint);
ZeppelinClient zClient = null;
try {
zClient = new ZeppelinClient(clientConfig);
String zeppelinVersion = zClient.getVersion();
final String zeppelinVersion = zClient.getVersion();
logger.info("zeppelin version: {}", zeppelinVersion);
} catch (Exception e) {
// TODO: complete error handling
Expand Down Expand Up @@ -168,14 +167,15 @@ public AbstractParameters getParameters() {

@Override
public void cancelApplication(boolean status) throws Exception {
final String restEndpoint = this.zeppelinParameters.getRestEndpoint();
super.cancelApplication(status);
String noteId = this.zeppelinParameters.getNoteId();
String paragraphId = this.zeppelinParameters.getParagraphId();
final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
if (paragraphId == null) {
logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
this.taskExecutionContext.getTaskInstanceId(),
noteId);
Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api");
Unirest.config().defaultBaseUrl(restEndpoint + "/api");
Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson();
logger.info("zeppelin task terminated, taskId: {}, noteId: {}",
this.taskExecutionContext.getTaskInstanceId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ZeppelinTaskTest {
private static final String MOCK_NOTE_ID = "2GYJR92R7";
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
private static final String MOCK_REST_ENDPOINT = "localhost:8080";
private final ObjectMapper mapper = new ObjectMapper();

private ZeppelinClient zClient;
Expand Down Expand Up @@ -164,6 +165,7 @@ private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setParameters(MOCK_PARAMETERS);

return JSONUtils.toJsonString(zeppelinParameters);
Expand All @@ -173,6 +175,7 @@ private String buildZeppelinTaskParametersWithNoParagraphId() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);

return JSONUtils.toJsonString(zeppelinParameters);
}
Expand Down
4 changes: 4 additions & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ export default {
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips:
'Please enter the paragraph id of your zeppelin paragraph',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',
Expand Down
6 changes: 4 additions & 2 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,14 @@ export default {
emr_steps_define_json_tips: '请输入EMR步骤定义',
segment_separator: '分段执行符号',
segment_separator_tips: '请输入分段执行符号',
zeppelin_note_id: 'zeppelin_note_id',
zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelin_paragraph_id',
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
jupyter_input_note_path: 'inputNotePath',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
placeholder: t('project.node.zeppelin_paragraph_id_tips')
}
},
{
type: 'input',
field: 'zeppelinRestEndpoint',
name: t('project.node.zeppelin_rest_endpoint'),
props: {
placeholder: t('project.node.zeppelin_rest_endpoint_tips')
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.zeppelin_rest_endpoint_tips'))
}
}
}
},
{
type: 'input',
field: 'parameters',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'ZEPPELIN') {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
taskParams.restEndpoint = data.zeppelinRestEndpoint
taskParams.parameters = data.parameters
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ interface ITaskParams {
stepsDefineJson?: string
zeppelinNoteId?: string
zeppelinParagraphId?: string
zeppelinRestEndpoint?: string
restEndpoint?: string
noteId?: string
paragraphId?: string
condaEnvName?: string
Expand Down

0 comments on commit a38fa34

Please sign in to comment.