Skip to content

Commit

Permalink
for #153 refactor JobEventBus's constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 8, 2016
1 parent a2f9aca commit 350f2c5
Show file tree
Hide file tree
Showing 15 changed files with 113 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,46 @@
import com.dangdang.ddframe.job.util.concurrent.ExecutorServiceObject;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.Getter;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

/**
* 运行痕迹事件总线.
*
* @author zhangliang
* @author caohao
*/
@Slf4j
public class JobEventBus {

private final Collection<JobEventConfiguration> jobEventConfigs;
private final JobEventConfiguration jobEventConfig;

@Getter
private final ExecutorServiceObject executorServiceObject;

private final EventBus eventBus;

public JobEventBus(final JobEventConfiguration... jobEventConfigs) {
this.jobEventConfigs = getJobEventConfiguration(jobEventConfigs);
private boolean isRegistered;

public JobEventBus() {
jobEventConfig = null;
executorServiceObject = null;
eventBus = null;
}

public JobEventBus(final JobEventConfiguration jobEventConfig) {
this.jobEventConfig = jobEventConfig;
executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
register();
}

private Collection<JobEventConfiguration> getJobEventConfiguration(final JobEventConfiguration... jobEventConfigs) {
Map<String, JobEventConfiguration> result = new HashMap<>(jobEventConfigs.length, 1);
for (JobEventConfiguration each : jobEventConfigs) {
result.put(each.getIdentity(), each);
}
return result.values();
}

private void register() {
for (JobEventConfiguration each : jobEventConfigs) {
if (null != each) {
eventBus.register(each.createJobEventListener());
}
try {
eventBus.register(jobEventConfig.createJobEventListener());
isRegistered = true;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("Elastic job: create JobEventRdbListener failure, error is: ", ex);
}
}

Expand All @@ -70,7 +69,7 @@ private void register() {
* @param event 作业事件
*/
public void post(final JobEvent event) {
if (!jobEventConfigs.isEmpty() && !executorServiceObject.isShutdown()) {
if (isRegistered && !executorServiceObject.isShutdown()) {
eventBus.post(event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface JobEventConfiguration extends JobEventIdentity {
* 创建作业事件监听器.
*
* @return 作业事件监听器.
* @throws JobEventListenerConfigurationException 作业事件监听器配置异常
*/
JobEventListener createJobEventListener();
JobEventListener createJobEventListener() throws JobEventListenerConfigurationException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.event;

/**
* 作业事件监听器配置异常.
*
* @author zhangliang
*/
public class JobEventListenerConfigurationException extends Exception {

private static final long serialVersionUID = 4069519372148227761L;

public JobEventListenerConfigurationException(final Exception ex) {
super(ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.JobEventListener;
import com.dangdang.ddframe.job.event.JobEventListenerConfigurationException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,12 +43,11 @@ public class JobEventRdbConfiguration extends JobEventRdbIdentity implements Job
private final DataSource dataSource;

@Override
public JobEventListener createJobEventListener() {
public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
try {
return new JobEventRdbListener(dataSource);
} catch (final SQLException ex) {
log.error("Elastic job: create JobEventRdbListener failure, error is: ", ex);
throw new JobEventListenerConfigurationException(ex);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dangdang.ddframe.job.event.fixture.JobEventCaller;
import com.dangdang.ddframe.job.event.fixture.TestJobEventConfiguration;
import com.dangdang.ddframe.job.event.fixture.TestJobEventListener;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent.ExecutionSource;
import com.google.common.eventbus.EventBus;
Expand Down Expand Up @@ -61,4 +62,12 @@ public void assertPostWithoutListener() throws NoSuchFieldException {
jobEventBus.post(new JobExecutionEvent("fake_task_id", "test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
verify(eventBus, times(0)).post(Matchers.<JobEvent>any());
}

@Test
public void assertPostWithListenerInvalid() throws NoSuchFieldException {
jobEventBus = new JobEventBus(new JobEventRdbConfiguration(null));
ReflectionUtils.setFieldValue(jobEventBus, "eventBus", eventBus);
jobEventBus.post(new JobExecutionEvent("fake_task_id", "test_event_bus_job", ExecutionSource.NORMAL_TRIGGER, 0));
verify(eventBus, times(0)).post(Matchers.<JobEvent>any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.JobEventListener;
import com.dangdang.ddframe.job.event.JobEventListenerConfigurationException;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
Expand All @@ -27,7 +28,7 @@ public class TestJobEventConfiguration extends TestJobEventIdentity implements J
private final JobEventCaller jobEventCaller;

@Override
public JobEventListener createJobEventListener() {
public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
return new TestJobEventListener(jobEventCaller);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package com.dangdang.ddframe.job.event.rdb;

import com.dangdang.ddframe.job.event.JobEventListenerConfigurationException;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;

public final class JobEventRdbConfigurationTest {

@Test
public void assertCreateJobEventListenerSuccess() {
public void assertCreateJobEventListenerSuccess() throws JobEventListenerConfigurationException {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(org.h2.Driver.class.getName());
dataSource.setUrl("jdbc:h2:mem:job_event_storage");
Expand All @@ -36,8 +36,8 @@ public void assertCreateJobEventListenerSuccess() {
assertThat(new JobEventRdbConfiguration(dataSource).createJobEventListener(), instanceOf(JobEventRdbListener.class));
}

@Test
public void assertCreateJobEventListenerFailure() {
assertNull(new JobEventRdbConfiguration(new BasicDataSource()).createJobEventListener());
@Test(expected = JobEventListenerConfigurationException.class)
public void assertCreateJobEventListenerFailure() throws JobEventListenerConfigurationException {
new JobEventRdbConfiguration(new BasicDataSource()).createJobEventListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/

package com.dangdang.ddframe.job.event.rdb;

import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.JobEventListenerConfigurationException;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
Expand Down Expand Up @@ -49,7 +50,7 @@ public final class JobEventRdbListenerTest {
private JobEventBus jobEventBus;

@Before
public void setUp() throws SQLException, NoSuchFieldException {
public void setUp() throws JobEventListenerConfigurationException, SQLException, NoSuchFieldException {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(org.h2.Driver.class.getName());
dataSource.setUrl("jdbc:h2:mem:job_event_storage");
Expand Down
3 changes: 2 additions & 1 deletion elastic-job-core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="com.dangdang.ddframe.job.event.log.JobEventLogListener" level="OFF" />
<logger name="com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration" level="OFF" />
<logger name="com.dangdang.ddframe.job.event.JobEventBus" level="OFF" />
<logger name="com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler" level="OFF" />
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collection;
import java.util.Collections;

public final class JavaLiteJobMain {

Expand All @@ -67,10 +65,10 @@ public static void main(final String[] args) throws Exception {
// CHECKSTYLE:ON
setUpEmbedZookeeperServer();
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
Collection<JobEventConfiguration> jobEventConfigs = Collections.<JobEventConfiguration>singletonList(new JobEventRdbConfiguration(setUpEventTraceDataSource()));
setUpSimpleJob(regCenter, jobEventConfigs);
setUpDataflowJob(regCenter, jobEventConfigs);
setUpScriptJob(regCenter, jobEventConfigs);
JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource());
setUpSimpleJob(regCenter, jobEventConfig);
setUpDataflowJob(regCenter, jobEventConfig);
setUpScriptJob(regCenter, jobEventConfig);
}

private static void setUpEmbedZookeeperServer() throws Exception {
Expand All @@ -93,22 +91,22 @@ private static DataSource setUpEventTraceDataSource() {
return result;
}

private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final Collection<JobEventConfiguration> jobEventConfigs) {
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfigs, new SimpleListener(), new SimpleDistributeListener(1000L, 2000L)).init();
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig, new SimpleListener(), new SimpleDistributeListener(1000L, 2000L)).init();
}

private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final Collection<JobEventConfiguration> jobEventConfigs) {
private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaDataflowElasticJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, JavaDataflowJob.class.getCanonicalName(), true);
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(dataflowJobConfig).build(), jobEventConfigs).init();
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(dataflowJobConfig).build(), jobEventConfig).init();
}

private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final Collection<JobEventConfiguration> jobEventConfigs) throws IOException {
private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0/5 * * * * ?", 3).build();
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfigs).init();
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
}

private static String buildScriptCommandLine() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.curator" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN" />
<logger name="org.apache.curator" level="WARN" />
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import org.quartz.plugins.management.ShutdownHookPlugin;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

/**
Expand All @@ -65,12 +63,14 @@ public class JobScheduler {
private final JobFacade jobFacade;

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, Collections.<JobEventConfiguration>emptyList(), elasticJobListeners);
JobEventBus jobEventBus = new JobEventBus();
jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final Collection<JobEventConfiguration> jobEventConfigs,
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
JobEventBus jobEventBus = new JobEventBus(jobEventConfigs.toArray(new JobEventConfiguration[jobEventConfigs.size()]));
JobEventBus jobEventBus = new JobEventBus(jobEventConfig);
jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;


@RunWith(Suite.class)
@SuiteClasses({
JobAPIFactoryTest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
factory.setDestroyMethodName("shutdown");
factory.addConstructorArgReference(element.getAttribute(REGISTRY_CENTER_REF_ATTRIBUTE));
factory.addConstructorArgValue(createJobConfiguration(element));
factory.addConstructorArgValue(createJobEventConfigs(element));
BeanDefinition jobEventConfig = createJobEventConfig(element);
if (null != jobEventConfig) {
factory.addConstructorArgValue(jobEventConfig);
}
factory.addConstructorArgValue(createJobListeners(element));
return factory.getBeanDefinition();
}
Expand Down Expand Up @@ -102,15 +105,14 @@ private BeanDefinition createJobConfiguration(final Element element) {
return factory.getBeanDefinition();
}

private List<BeanDefinition> createJobEventConfigs(final Element element) {
List<BeanDefinition> result = new ManagedList<>();
private BeanDefinition createJobEventConfig(final Element element) {
String eventTraceRdbDs = element.getAttribute(EVENT_TRACE_DATA_SOURCE_ATTRIBUTE);
if (!Strings.isNullOrEmpty(eventTraceRdbDs)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
factory.addConstructorArgReference(eventTraceRdbDs);
result.add(factory.getBeanDefinition());
if (Strings.isNullOrEmpty(eventTraceRdbDs)) {
return null;
}
return result;
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
factory.addConstructorArgReference(eventTraceRdbDs);
return factory.getBeanDefinition();
}

private List<BeanDefinition> createJobListeners(final Element element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.Collection;
import java.util.Properties;

/**
Expand All @@ -38,9 +37,13 @@ public class SpringJobScheduler extends JobScheduler implements ApplicationConte

private ApplicationContext applicationContext;

public SpringJobScheduler(final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto, final Collection<JobEventConfiguration> jobEventConfigs,
final ElasticJobListener[]elasticJobListeners) {
super(regCenter, jobConfigDto.toLiteJobConfiguration(), jobEventConfigs, getTargetElasticJobListeners(elasticJobListeners));
public SpringJobScheduler(final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto, final ElasticJobListener[]elasticJobListeners) {
super(regCenter, jobConfigDto.toLiteJobConfiguration(), getTargetElasticJobListeners(elasticJobListeners));
}

public SpringJobScheduler(final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto,
final JobEventConfiguration jobEventConfig, final ElasticJobListener[]elasticJobListeners) {
super(regCenter, jobConfigDto.toLiteJobConfiguration(), jobEventConfig, getTargetElasticJobListeners(elasticJobListeners));
}

private static ElasticJobListener[] getTargetElasticJobListeners(final ElasticJobListener[] elasticJobListeners) {
Expand Down

0 comments on commit 350f2c5

Please sign in to comment.