Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pinball scheduler to dr-elephant #253

Merged
merged 2 commits into from
Jun 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app-conf/SchedulerConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
</params>
</scheduler>

<scheduler>
<name>pinball</name>
<classname>com.linkedin.drelephant.schedulers.PinballScheduler</classname>
</scheduler>

<scheduler>
<name>no_scheduler</name>
<classname>com.linkedin.drelephant.schedulers.NoScheduler</classname>
Expand Down
105 changes: 105 additions & 0 deletions app/com/linkedin/drelephant/schedulers/PinballScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.linkedin.drelephant.schedulers;

import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.util.Utils;

import java.util.Properties;
import org.apache.log4j.Logger;

/**
* This class provides methods to load information specific to the Pinball scheduler.
*/
public class PinballScheduler implements Scheduler {

private static final Logger logger = Logger.getLogger(PinballScheduler.class);

public static final String PINBALL_WORKFLOW = "pinball.workflow";
public static final String PINBALL_INSTANCE = "pinball.instance";
public static final String PINBALL_JOB = "pinball.job";
public static final String PINBALL_EXECUTION = "pinball.execution";
public static final String PINBALL_BASE_URL = "scheduler.url";
public static final String PINBALL_BASE_URL_DEFAULT = "http://localhost:8080";

private String _schedulerName;
private String _jobName;
private String _jobExecutionId;
private String _workflowName;
private String _workflowInstanceId;
private String _baseUrl;

public PinballScheduler(String appId, Properties properties, SchedulerConfigurationData schedulerConfData) {
_schedulerName = schedulerConfData.getSchedulerName();
if (properties != null) {
loadInfo(appId, properties);
}
}

private void loadInfo(String appId, Properties properties) {
_workflowName = properties.getProperty(PINBALL_WORKFLOW);
_workflowInstanceId = properties.getProperty(PINBALL_INSTANCE);
_jobName = properties.getProperty(PINBALL_JOB); //
_jobExecutionId = properties.getProperty(PINBALL_EXECUTION);
_baseUrl = Utils.formatStringOrNull("%s", properties.getProperty(PINBALL_BASE_URL));
}

@Override
public String getSchedulerName() {
return _schedulerName;
}

@Override
public boolean isEmpty() {
return _jobName == null || _jobExecutionId == null || _workflowName == null || _workflowInstanceId == null;
}

@Override
public String getJobDefId() {
return Utils.formatStringOrNull("%s/%s", _workflowName, _jobName);
}

@Override
public String getJobExecId() {
return Utils.formatStringOrNull("%s/%s/%s/%s", _workflowName, _workflowInstanceId, _jobName, _jobExecutionId);
}

@Override
public String getFlowDefId() {
return Utils.formatStringOrNull("%s", _workflowName);
}

@Override
public String getFlowExecId() {
return Utils.formatStringOrNull("%s/%s", _workflowName, _workflowInstanceId);
}

@Override
public String getJobDefUrl() {
return Utils.formatStringOrNull("%s/executions/?workflow=%s&instance=%s&job=%s",
_baseUrl, _workflowName, _workflowInstanceId, _jobName);
}

@Override
public String getJobExecUrl() {
return Utils.formatStringOrNull("%s/execution/?workflow=%s&instance=%s&job=%s&execution=%s",
_baseUrl, _workflowName, _workflowInstanceId, _jobName, _jobExecutionId);
}

@Override
public String getFlowDefUrl() {
return Utils.formatStringOrNull("%s/instances/?workflow=%s", _baseUrl, _workflowName);
}

@Override
public String getFlowExecUrl() {
return Utils.formatStringOrNull("%s/jobs/?workflow=%s&instance=%s", _baseUrl, _workflowName, _workflowInstanceId);
}

// Sub-workflow is not supported on Pinball
@Override
public int getWorkflowDepth() {
return 0;
}

@Override
public String getJobName() { return _jobName; }
}
106 changes: 106 additions & 0 deletions test/com/linkedin/drelephant/schedulers/PinballSchedulerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.linkedin.drelephant.schedulers;

import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;

import java.util.Properties;
import org.junit.Test;

import static com.linkedin.drelephant.schedulers.PinballScheduler.PINBALL_WORKFLOW;
import static com.linkedin.drelephant.schedulers.PinballScheduler.PINBALL_INSTANCE;
import static com.linkedin.drelephant.schedulers.PinballScheduler.PINBALL_JOB;
import static com.linkedin.drelephant.schedulers.PinballScheduler.PINBALL_EXECUTION;
import static com.linkedin.drelephant.schedulers.PinballScheduler.PINBALL_BASE_URL;

import static org.junit.Assert.assertEquals;


public class PinballSchedulerTest {

@Test
public void testPinballLoadInfoWithCompleteConf() {
PinballScheduler pinballScheduler = new PinballScheduler("id", getPinballProperties(), getSchedulerConfData());

assertEquals("http://localhost:8080/instances/?workflow=workflow_name", pinballScheduler.getFlowDefUrl());
assertEquals("workflow_name", pinballScheduler.getFlowDefId());
assertEquals("http://localhost:8080/jobs/?workflow=workflow_name&instance=workflow_instance", pinballScheduler.getFlowExecUrl());
assertEquals("workflow_name/workflow_instance", pinballScheduler.getFlowExecId());

assertEquals("http://localhost:8080/executions/?workflow=workflow_name&instance=workflow_instance&job=job_name", pinballScheduler.getJobDefUrl());
assertEquals("workflow_name/job_name", pinballScheduler.getJobDefId());
assertEquals("http://localhost:8080/execution/?workflow=workflow_name&instance=workflow_instance&job=job_name&execution=job_execution", pinballScheduler.getJobExecUrl());
assertEquals("workflow_name/workflow_instance/job_name/job_execution", pinballScheduler.getJobExecId());

assertEquals("job_name", pinballScheduler.getJobName());
assertEquals(0, pinballScheduler.getWorkflowDepth());
assertEquals("pinball", pinballScheduler.getSchedulerName());
}

@Test
public void testPinballLoadInfoWithMissingProperty() {
PinballScheduler pinballScheduler = new PinballScheduler("id", getPropertiesAndRemove(PINBALL_JOB), getSchedulerConfData());

assertEquals("http://localhost:8080/instances/?workflow=workflow_name", pinballScheduler.getFlowDefUrl());
assertEquals("workflow_name", pinballScheduler.getFlowDefId());
assertEquals("http://localhost:8080/jobs/?workflow=workflow_name&instance=workflow_instance", pinballScheduler.getFlowExecUrl());
assertEquals("workflow_name/workflow_instance", pinballScheduler.getFlowExecId());

assertEquals(null, pinballScheduler.getJobDefUrl());
assertEquals(null, pinballScheduler.getJobDefId());
assertEquals(null, pinballScheduler.getJobExecUrl());
assertEquals(null, pinballScheduler.getJobExecId());

assertEquals(null, pinballScheduler.getJobName());
assertEquals(0, pinballScheduler.getWorkflowDepth());
assertEquals("pinball", pinballScheduler.getSchedulerName());
}

@Test
public void testPinballLoadInfoWithNullProperty() {
PinballScheduler pinballScheduler = new PinballScheduler("id", null, getSchedulerConfData());

assertEquals(null, pinballScheduler.getFlowDefUrl());
assertEquals(null, pinballScheduler.getFlowDefId());
assertEquals(null, pinballScheduler.getFlowExecId());
assertEquals(null, pinballScheduler.getFlowExecUrl());

assertEquals(null, pinballScheduler.getJobDefId());
assertEquals(null, pinballScheduler.getJobDefUrl());
assertEquals(null, pinballScheduler.getJobExecId());
assertEquals(null, pinballScheduler.getJobExecUrl());

assertEquals(null, pinballScheduler.getJobName());
assertEquals(0, pinballScheduler.getWorkflowDepth());
assertEquals("pinball", pinballScheduler.getSchedulerName());
}

@Test
public void testPinballLoadsNameFromConfData() {
PinballScheduler pinballScheduler = new PinballScheduler("id", null, getSchedulerConfData("othername"));
assertEquals("othername", pinballScheduler.getSchedulerName());
}

private static Properties getPinballProperties() {
Properties properties = new Properties();
properties.put(PINBALL_WORKFLOW, "workflow_name");
properties.put(PINBALL_INSTANCE, "workflow_instance");
properties.put(PINBALL_JOB, "job_name");
properties.put(PINBALL_EXECUTION, "job_execution");
properties.put(PINBALL_BASE_URL, "http://localhost:8080");

return properties;
}

private static Properties getPropertiesAndRemove(String key) {
Properties properties = getPinballProperties();
properties.remove(key);
return properties;
}

private static SchedulerConfigurationData getSchedulerConfData() {
return getSchedulerConfData("pinball");
}

private static SchedulerConfigurationData getSchedulerConfData(String name) {
return new SchedulerConfigurationData(name, null, null);
}
}