Skip to content

Commit

Permalink
For #156 add task_id into job execution event.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Nov 8, 2016
1 parent a58d4f6 commit 6db30a0
Show file tree
Hide file tree
Showing 20 changed files with 74 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public CloudJobFacadeTest() {
private ShardingContexts getShardingContexts() {
Map<Integer, String> shardingItemParameters = new HashMap<>(1, 1);
shardingItemParameters.put(0, "A");
return new ShardingContexts("test_job", 3, "", shardingItemParameters);
return new ShardingContexts("fake_task_id", "test_job", 3, "", shardingItemParameters);
}

private Map<String, String> getJobConfigurationMap(final JobType jobType, final boolean streamingProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private TaskInfo.Builder buildTaskInfo(final Map<String, String> jobConfiguratio

private byte[] serialize(final Map<String, String> jobConfigurationContext) {
LinkedHashMap<String, Object> result = new LinkedHashMap<>(2, 1);
ShardingContexts shardingContexts = new ShardingContexts("test_job", 1, "", Collections.singletonMap(1, "a"));
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.singletonMap(1, "a"));
result.put("shardingContext", shardingContexts);
result.put("jobConfigContext", jobConfigurationContext);
return SerializationUtils.serialize(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ private Protos.TaskInfo getTaskInfo(final Protos.SlaveID slaveID, final TaskAssi
Map<Integer, String> assignedShardingItemParameters = new HashMap<>(1, 1);
int shardingItem = taskContext.getMetaInfo().getShardingItem();
assignedShardingItemParameters.put(shardingItem, shardingItemParameters.containsKey(shardingItem) ? shardingItemParameters.get(shardingItem) : "");
ShardingContexts shardingContexts = new ShardingContexts(
jobConfig.getJobName(), jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), jobConfig.getTypeConfig().getCoreConfig().getJobParameter(), assignedShardingItemParameters);
ShardingContexts shardingContexts = new ShardingContexts(taskAssignmentResult.getTaskId(), jobConfig.getJobName(), jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
jobConfig.getTypeConfig().getCoreConfig().getJobParameter(), assignedShardingItemParameters);
Protos.CommandInfo.URI uri = Protos.CommandInfo.URI.newBuilder().setValue(jobConfig.getAppURL()).setExtract(true)
.setCache(BootstrapEnvironment.getInstance().getFrameworkConfiguration().isAppCacheEnable()).build();
Protos.CommandInfo command = Protos.CommandInfo.newBuilder().addUris(uri).setShell(true).setValue(jobConfig.getBootstrapScript()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public final class TaskInfoDataTest {

private final ShardingContexts shardingContexts = new ShardingContexts("test_job", 3, "test_param", Collections.<Integer, String>emptyMap());
private final ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 3, "test_param", Collections.<Integer, String>emptyMap());

@Test
public void assertSerializeSimpleJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private void createJobExecutionTable(final Connection conn) throws SQLException
String dbSchema = "CREATE TABLE `" + TABLE_JOB_EXECUTION_LOG + "` ("
+ "`id` VARCHAR(40) NOT NULL, "
+ "`job_name` VARCHAR(100) NOT NULL, "
+ "`task_id` VARCHAR(1000) NOT NULL, "
+ "`hostname` VARCHAR(255) NOT NULL, "
+ "`ip` VARCHAR(50) NOT NULL, "
+ "`sharding_item` INT NOT NULL, "
Expand Down Expand Up @@ -159,19 +160,20 @@ boolean addJobTraceEvent(final JobTraceEvent traceEvent) {
boolean addJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
if (null == jobExecutionEvent.getCompleteTime()) {
String sql = "INSERT INTO `" + TABLE_JOB_EXECUTION_LOG + "` (`id`, `job_name`, `hostname`, `ip`, `sharding_item`, `execution_source`, `is_success`, `start_time`) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
String sql = "INSERT INTO `" + TABLE_JOB_EXECUTION_LOG + "` (`id`, `job_name`, `task_id`, `hostname`, `ip`, `sharding_item`, `execution_source`, `is_success`, `start_time`) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getHostname());
preparedStatement.setString(4, jobExecutionEvent.getIp());
preparedStatement.setInt(5, jobExecutionEvent.getShardingItem());
preparedStatement.setString(6, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(7, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(8, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
Expand Down Expand Up @@ -214,7 +216,7 @@ boolean addJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {

boolean addJobStatusTraceEvent(final JobStatusTraceEvent jobStatusTraceEvent) {
boolean result = false;
String sql = "INSERT INTO `" + TABLE_JOB_TRACE_LOG + "` (`id`, `job_name`, `task_id`, `slave_id`, `execution_type`, `hostname`, `ip`, `sharding_item`, "
String sql = "INSERT INTO `" + TABLE_JOB_STATUS_TRACE_LOG + "` (`id`, `job_name`, `task_id`, `slave_id`, `execution_type`, `hostname`, `ip`, `sharding_item`, "
+ "`state`, `message`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class JobExecutionEvent implements JobEvent {

private final String ip = localHostService.getIp();

private final String taskId;

private final String jobName;

private final ExecutionSource source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ private void process(final ShardingContexts shardingContexts, final JobExecution
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobName, executionSource, item);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobName, executionSource, each);
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
executorService.submit(new Runnable() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public final class ShardingContexts implements Serializable {

private static final long serialVersionUID = -4585977349142082152L;

/**
* 作业任务ID.
*/
private final String taskId;

/**
* 作业名称.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class JobEventBusTest {
@Test
public void assertPost() throws InterruptedException {
jobEventBus = new JobEventBus(new TestJobEventConfiguration(jobEventCaller));
jobEventBus.post(new JobExecutionEvent("test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
jobEventBus.post(new JobExecutionEvent("fake_task_id", "test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
while (!TestJobEventListener.isExecutionEventCalled()) {
Thread.sleep(100L);
}
Expand All @@ -58,7 +58,7 @@ public void assertPost() throws InterruptedException {
public void assertPostWithoutListener() throws NoSuchFieldException {
jobEventBus = new JobEventBus();
ReflectionUtils.setFieldValue(jobEventBus, "eventBus", eventBus);
jobEventBus.post(new JobExecutionEvent("test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
jobEventBus.post(new JobExecutionEvent("fake_task_id", "test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
verify(eventBus, times(0)).post(Matchers.<JobEvent>any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class JobExecutionEventTest {

@Test
public void assertNewJobExecutionEvent() {
JobExecutionEvent actual = new JobExecutionEvent("test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent actual = new JobExecutionEvent("fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
assertThat(actual.getJobName(), is("test_job"));
assertThat(actual.getSource(), is(JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER));
assertThat(actual.getShardingItem(), is(0));
Expand All @@ -45,15 +45,15 @@ public void assertNewJobExecutionEvent() {

@Test
public void assertExecutionSuccess() {
JobExecutionEvent actual = new JobExecutionEvent("test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent actual = new JobExecutionEvent("fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
actual.executionSuccess();
assertNotNull(actual.getCompleteTime());
assertTrue(actual.isSuccess());
}

@Test
public void assertExecutionFailure() {
JobExecutionEvent actual = new JobExecutionEvent("test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent actual = new JobExecutionEvent("fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
actual.executionFailure(new RuntimeException("failure"));
assertNotNull(actual.getCompleteTime());
assertFalse(actual.isSuccess());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void assertPostJobTraceEvent() {

@Test
public void assertPostJobExecutionEvent() {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(JOB_NAME, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("fake_task_id", JOB_NAME, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
jobEventBus.post(jobExecutionEvent);
verify(repository, atMost(1)).addJobExecutionEvent(jobExecutionEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void assertAddTraceLogWithLogLevel() throws SQLException {

@Test
public void assertAddJobExecutionEvent() throws SQLException {
assertTrue(storage.addJobExecutionEvent(new JobExecutionEvent("test_job", ExecutionSource.NORMAL_TRIGGER, 0)));
assertTrue(storage.addJobExecutionEvent(new JobExecutionEvent("fake_task_id", "test_job", ExecutionSource.NORMAL_TRIGGER, 0)));
}

@Test
public void assertUpdateJobExecutionEventWhenSuccess() throws SQLException {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("test_job", ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("fake_task_id", "test_job", ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(jobExecutionEvent));
jobExecutionEvent.executionSuccess();
assertTrue(storage.addJobExecutionEvent(jobExecutionEvent));
Expand All @@ -73,7 +73,7 @@ public void assertUpdateJobExecutionEventWhenSuccess() throws SQLException {

@Test
public void assertUpdateJobExecutionEventWhenFailure() throws SQLException {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("test_job", ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("fake_task_id", "test_job", ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(jobExecutionEvent));
jobExecutionEvent.executionFailure(new RuntimeException("failure"));
assertTrue(storage.addJobExecutionEvent(jobExecutionEvent));
Expand All @@ -82,7 +82,7 @@ public void assertUpdateJobExecutionEventWhenFailure() throws SQLException {

@Test
public void assertUpdateJobExecutionEventWhenFailureAndMessageExceed() throws SQLException {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("test_job", ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("fake_task_id", "test_job", ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(jobExecutionEvent));
StringBuilder failureMsg = new StringBuilder();
for (int i = 0; i < 600; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,35 @@ public final class JobExecutorFactoryTest {

@Test
public void assertGetJobExecutorForScriptJob() {
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("script_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("fake_task_id", "script_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh", IgnoreJobExceptionHandler.class));
assertThat(JobExecutorFactory.getJobExecutor(null, jobFacade), instanceOf(ScriptJobExecutor.class));
}

@Test
public void assertGetJobExecutorForSimpleJob() {
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("simple_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("fake_task_id", "simple_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
assertThat(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade), instanceOf(SimpleJobExecutor.class));
}

@Test
public void assertGetJobExecutorForDataflowJob() {
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("dataflow_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("fake_task_id", "dataflow_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false));
assertThat(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade), instanceOf(DataflowJobExecutor.class));
}

@Test(expected = JobConfigurationException.class)
public void assertGetJobExecutorWhenJobClassWhenUnsupportedJob() {
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("unsupported_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("fake_task_id", "unsupported_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
JobExecutorFactory.getJobExecutor(new OtherJob(), jobFacade);
}

@Test
public void assertGetJobExecutorTwice() {
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("twice_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.getShardingContexts()).thenReturn(new ShardingContexts("fake_task_id", "twice_test_job", 10, "", Collections.<Integer, String>emptyMap()));
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false));
AbstractElasticJobExecutor executor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
AbstractElasticJobExecutor anotherExecutor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void assertExecuteWhenCheckMaxTimeDiffSecondsIntolerable() throws JobExec

@Test
public void assertExecuteWhenPreviousJobStillRunning() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = new ShardingContexts("test_job", 10, "", Collections.<Integer, String>emptyMap());
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.<Integer, String>emptyMap());
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfNecessary(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true);
simpleJobExecutor.execute();
Expand All @@ -96,7 +96,7 @@ public void assertExecuteWhenPreviousJobStillRunning() throws JobExecutionEnviro

@Test
public void assertExecuteWhenShardingItemsIsEmpty() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = new ShardingContexts("test_job", 10, "", Collections.<Integer, String>emptyMap());
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.<Integer, String>emptyMap());
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
simpleJobExecutor.execute();
verify(jobFacade).checkJobExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public final class ShardingContextsBuilder {
public static ShardingContexts getSingleShardingContexts() {
Map<Integer, String> map = new HashMap<>(1, 1);
map.put(0, "A");
return new ShardingContexts(JOB_NAME, 1, "", map);
return new ShardingContexts("fake_task_id", JOB_NAME, 1, "", map);
}

public static ShardingContexts getMultipleShardingContexts() {
Map<Integer, String> map = new HashMap<>(2, 1);
map.put(0, "A");
map.put(1, "B");
return new ShardingContexts(JOB_NAME, 2, "", map);
return new ShardingContexts("fake_task_id", JOB_NAME, 2, "", map);
}
}
Loading

0 comments on commit 6db30a0

Please sign in to comment.