From 11dd433d72c6f0028392b21124ffaea62c8d8711 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Tue, 8 Mar 2016 16:51:44 +0800 Subject: [PATCH] fix #2 --- .../ddframe/job/api/JobScheduler.java | 52 +++++--- ...tractDistributeOnceElasticJobListener.java | 123 ++++++++++++++++++ .../job/api/listener/ElasticJobListener.java | 42 ++++++ .../job/exception/JobTimeoutException.java | 35 +++++ .../job/internal/env/LocalHostService.java | 2 +- .../ddframe/job/internal/env/TimeService.java | 35 +++++ .../guarantee/GuaranteeListenerManager.java | 82 ++++++++++++ .../job/internal/guarantee/GuaranteeNode.java | 57 ++++++++ .../internal/guarantee/GuaranteeService.java | 95 ++++++++++++++ .../job/internal/job/AbstractElasticJob.java | 32 ++++- .../internal/listener/ListenerManager.java | 10 +- .../dangdang/ddframe/job/api/AllApiTests.java | 4 +- .../ddframe/job/api/JobSchedulerTest.java | 42 ++++++ .../DistributeOnceElasticJobListenerTest.java | 115 ++++++++++++++++ .../fixture/ElasticJobListenerCaller.java | 23 ++++ .../TestDistributeOnceElasticJobListener.java | 41 ++++++ .../fixture/TestElasticJobListener.java | 39 ++++++ .../job/exception/AllExceptionTests.java | 3 +- .../exception/JobTimeoutExceptionTest.java | 31 +++++ .../AbstractBaseStdJobAutoInitTest.java | 6 + .../job/integrate/AbstractBaseStdJobTest.java | 52 ++++++-- ...amingThroughputDataFlowElasticJobTest.java | 2 +- .../job/internal/AllInternalTests.java | 10 +- .../job/internal/env/TimeServiceTest.java | 32 +++++ .../GuaranteeListenerManagerTest.java | 101 ++++++++++++++ .../internal/guarantee/GuaranteeNodeTest.java | 61 +++++++++ .../guarantee/GuaranteeServiceTest.java | 118 +++++++++++++++++ .../listener/ListenerManagerTest.java | 26 ++-- elastic-job-doc/content/post/user_guide.md | 81 ++++++++++++ .../core/job/SequenceDataFlowJobDemo.java | 7 +- .../example/elasticjob/core/main/JobMain.java | 23 +++- .../spring/job/SequenceDataFlowJobDemo.java | 13 +- 32 files changed, 1330 insertions(+), 65 deletions(-) create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/AbstractDistributeOnceElasticJobListener.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/ElasticJobListener.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/TimeService.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManager.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNode.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/DistributeOnceElasticJobListenerTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/ElasticJobListenerCaller.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestDistributeOnceElasticJobListener.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestElasticJobListener.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/JobTimeoutExceptionTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/TimeServiceTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManagerTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNodeTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java index 306b1d2fdc..c5e5a55514 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java @@ -17,27 +17,15 @@ package com.dangdang.ddframe.job.api; -import java.util.Date; -import java.util.List; -import java.util.Properties; - -import org.quartz.CronScheduleBuilder; -import org.quartz.CronTrigger; -import org.quartz.JobBuilder; -import org.quartz.JobDetail; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.Trigger; -import org.quartz.TriggerBuilder; -import org.quartz.TriggerKey; -import org.quartz.impl.StdSchedulerFactory; - +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.exception.JobException; import com.dangdang.ddframe.job.internal.config.ConfigurationService; import com.dangdang.ddframe.job.internal.election.LeaderElectionService; import com.dangdang.ddframe.job.internal.execution.ExecutionContextService; import com.dangdang.ddframe.job.internal.execution.ExecutionService; import com.dangdang.ddframe.job.internal.failover.FailoverService; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService; import com.dangdang.ddframe.job.internal.listener.ListenerManager; import com.dangdang.ddframe.job.internal.monitor.MonitorService; import com.dangdang.ddframe.job.internal.offset.OffsetService; @@ -48,8 +36,22 @@ import com.dangdang.ddframe.job.internal.statistics.StatisticsService; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import com.google.common.base.Joiner; - import lombok.extern.slf4j.Slf4j; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.TriggerKey; +import org.quartz.impl.StdSchedulerFactory; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; /** * 作业调度器. @@ -89,15 +91,19 @@ public class JobScheduler { private final OffsetService offsetService; private final MonitorService monitorService; + + private List elasticJobListeners; private Scheduler scheduler; private JobDetail jobDetail; - public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) { + public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final ElasticJobListener... elasticJobListeners) { this.jobConfiguration = jobConfiguration; this.coordinatorRegistryCenter = coordinatorRegistryCenter; - listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration); + this.elasticJobListeners = Arrays.asList(elasticJobListeners); + setGuaranteeServiceForElasticJobListeners(); + listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration, this.elasticJobListeners); configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration); leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration); serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration); @@ -111,6 +117,15 @@ public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, f jobDetail = JobBuilder.newJob(jobConfiguration.getJobClass()).withIdentity(jobConfiguration.getJobName()).build(); } + private void setGuaranteeServiceForElasticJobListeners() { + GuaranteeService guaranteeService = new GuaranteeService(coordinatorRegistryCenter, jobConfiguration); + for (ElasticJobListener each : elasticJobListeners) { + if (each instanceof AbstractDistributeOnceElasticJobListener) { + ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService); + } + } + } + /** * 初始化作业. */ @@ -146,6 +161,7 @@ private void fillJobDetail() { jobDetail.getJobDataMap().put("executionService", executionService); jobDetail.getJobDataMap().put("failoverService", failoverService); jobDetail.getJobDataMap().put("offsetService", offsetService); + jobDetail.getJobDataMap().put("elasticJobListeners", elasticJobListeners); } private Scheduler initializeScheduler(final String jobName) throws SchedulerException { diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/AbstractDistributeOnceElasticJobListener.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/AbstractDistributeOnceElasticJobListener.java new file mode 100644 index 0000000000..c589fd68f0 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/AbstractDistributeOnceElasticJobListener.java @@ -0,0 +1,123 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener; + +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.exception.JobTimeoutException; +import com.dangdang.ddframe.job.internal.env.TimeService; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + +/** + * 在分布式作业中只执行一次的监听器. + * + * @author zhangliang + */ +@RequiredArgsConstructor +public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { + + private final long startedTimeoutMills; + + private final Object startedWait = new Object(); + + private final long completedTimeoutMills; + + private final Object completedWait = new Object(); + + @Setter + private GuaranteeService guaranteeService; + + private TimeService timeService = new TimeService(); + + @Override + public final void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + guaranteeService.registerStart(shardingContext.getShardingItems()); + if (guaranteeService.isAllStarted()) { + doBeforeJobExecutedAtLastStarted(shardingContext); + guaranteeService.clearAllStartedInfo(); + return; + } + long before = timeService.getCurrentMillis(); + try { + synchronized (startedWait) { + startedWait.wait(startedTimeoutMills); + } + } catch (final InterruptedException ex) { + Thread.interrupted(); + } + if (timeService.getCurrentMillis() - before >= startedTimeoutMills) { + guaranteeService.clearAllStartedInfo(); + throw new JobTimeoutException(startedTimeoutMills); + } + } + + @Override + public final void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + guaranteeService.registerComplete(shardingContext.getShardingItems()); + if (guaranteeService.isAllCompleted()) { + doAfterJobExecutedAtLastCompleted(shardingContext); + guaranteeService.clearAllCompletedInfo(); + return; + } + long before = timeService.getCurrentMillis(); + try { + synchronized (completedWait) { + completedWait.wait(completedTimeoutMills); + } + } catch (final InterruptedException ex) { + Thread.interrupted(); + } + if (timeService.getCurrentMillis() - before >= completedTimeoutMills) { + guaranteeService.clearAllCompletedInfo(); + throw new JobTimeoutException(completedTimeoutMills); + } + } + + /** + * 分布式环境中最后一个作业执行前的执行的方法. + * + * @param shardingContext 分片上下文 + */ + public abstract void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext); + + /** + * 分布式环境中最后一个作业执行后的执行的方法. + * + * @param shardingContext 分片上下文 + */ + public abstract void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext); + + /** + * 通知任务开始. + */ + public void notifyWaitingTaskStart() { + synchronized (startedWait) { + startedWait.notifyAll(); + } + } + + /** + * 通知任务结束. + */ + public void notifyWaitingTaskComplete() { + synchronized (completedWait) { + completedWait.notifyAll(); + } + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/ElasticJobListener.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/ElasticJobListener.java new file mode 100644 index 0000000000..5fe66b4a8f --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/listener/ElasticJobListener.java @@ -0,0 +1,42 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener; + +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; + +/** + * 弹性化分布式作业监听器接口. + * + * @author zhangliang + */ +public interface ElasticJobListener { + + /** + * 作业执行前的执行的方法. + * + * @param shardingContext 分片上下文 + */ + void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext); + + /** + * 作业执行后的执行的方法. + * + * @param shardingContext 分片上下文 + */ + void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext); +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java new file mode 100644 index 0000000000..b77200efae --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java @@ -0,0 +1,35 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.exception; + +/** + * 作业超时抛出的异常. + * + * @author zhangliang + */ +public final class JobTimeoutException extends JobException { + + /** + * 作业超时抛出的异常. + * + * @param timeoutMills 超时毫秒数 + */ + public JobTimeoutException(final long timeoutMills) { + super("Job timeout. timeout mills is %s.", timeoutMills); + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/LocalHostService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/LocalHostService.java index 819ff33169..2f1c67632d 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/LocalHostService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/LocalHostService.java @@ -26,7 +26,7 @@ import com.dangdang.ddframe.job.exception.JobException; /** - * 获取真实本机网络的实现类. + * 获取真实本机网络的服务. * * @author zhangliang */ diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/TimeService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/TimeService.java new file mode 100644 index 0000000000..61ddfaf9df --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/env/TimeService.java @@ -0,0 +1,35 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.env; + +/** + * 获取时间的服务. + * + * @author zhangliang + */ +public class TimeService { + + /** + * 获取当前时间的毫秒数. + * + * @return 当前时间的毫秒数 + */ + public long getCurrentMillis() { + return System.currentTimeMillis(); + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManager.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManager.java new file mode 100644 index 0000000000..6a08511ae2 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManager.java @@ -0,0 +1,82 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.internal.listener.AbstractJobListener; +import com.dangdang.ddframe.job.internal.listener.AbstractListenerManager; +import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; + +import java.util.List; + +/** + * 保证分布式任务全部开始和结束状态监听管理器. + * + * @author zhangliang + */ +public class GuaranteeListenerManager extends AbstractListenerManager { + + private final GuaranteeNode guaranteeNode; + + private final List elasticJobListeners; + + public GuaranteeListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final List elasticJobListeners) { + super(coordinatorRegistryCenter, jobConfiguration); + this.guaranteeNode = new GuaranteeNode(jobConfiguration.getJobName()); + this.elasticJobListeners = elasticJobListeners; + } + + @Override + public void start() { + addDataListener(new StartedNodeRemovedJobListener()); + addDataListener(new CompletedNodeRemovedJobListener()); + } + + class StartedNodeRemovedJobListener extends AbstractJobListener { + + @Override + protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { + if (Type.NODE_REMOVED == event.getType() && guaranteeNode.isStartedRootNode(path)) { + for (ElasticJobListener each : elasticJobListeners) { + if (each instanceof AbstractDistributeOnceElasticJobListener) { + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); + } + } + } + } + } + + class CompletedNodeRemovedJobListener extends AbstractJobListener { + + @Override + protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { + if (Type.NODE_REMOVED == event.getType() && guaranteeNode.isCompletedRootNode(path)) { + for (ElasticJobListener each : elasticJobListeners) { + if (each instanceof AbstractDistributeOnceElasticJobListener) { + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(); + } + } + } + } + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNode.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNode.java new file mode 100644 index 0000000000..9d1f52ef06 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNode.java @@ -0,0 +1,57 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import com.dangdang.ddframe.job.internal.storage.JobNodePath; +import com.google.common.base.Joiner; + +/** + * 保证分布式任务全部开始和结束状态节点名称的常量类. + * + * @author zhangliang + */ +public final class GuaranteeNode { + + static final String ROOT = "guarantee"; + + static final String STARTED_ROOT = ROOT + "/started"; + + static final String COMPLETED_ROOT = ROOT + "/completed"; + + private final JobNodePath jobNodePath; + + GuaranteeNode(final String jobName) { + jobNodePath = new JobNodePath(jobName); + } + + static String getStartedNode(final int shardingItem) { + return Joiner.on("/").join(STARTED_ROOT, shardingItem); + } + + static String getCompletedNode(final int shardingItem) { + return Joiner.on("/").join(COMPLETED_ROOT, shardingItem); + } + + boolean isStartedRootNode(final String path) { + return jobNodePath.getFullPath(STARTED_ROOT).equals(path); + } + + boolean isCompletedRootNode(final String path) { + return jobNodePath.getFullPath(COMPLETED_ROOT).equals(path); + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java new file mode 100644 index 0000000000..7750315958 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java @@ -0,0 +1,95 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; +import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; + +import java.util.Collection; + +/** + * 保证分布式任务全部开始和结束状态的服务. + * + * @author zhangliang + */ +public class GuaranteeService { + + private final JobConfiguration jobConfiguration; + + private final JobNodeStorage jobNodeStorage; + + public GuaranteeService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) { + this.jobConfiguration = jobConfiguration; + jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration); + } + + /** + * 根据分片项注册任务开始运行. + * + * @param shardingItems 待注册的分片项 + */ + public void registerStart(final Collection shardingItems) { + for (int each : shardingItems) { + jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each)); + } + } + + /** + * 判断是否所有的任务均启动完毕. + * + * @return 是否所有的任务均启动完毕 + */ + public boolean isAllStarted() { + return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size(); + } + + /** + * 清理所有任务启动信息. + */ + public void clearAllStartedInfo() { + jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT); + } + + /** + * 根据分片项注册任务完成运行. + * + * @param shardingItems 待注册的分片项 + */ + public void registerComplete(final Collection shardingItems) { + for (int each : shardingItems) { + jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getCompletedNode(each)); + } + } + + /** + * 判断是否所有的任务均执行完毕. + * + * @return 是否所有的任务均执行完毕 + */ + public boolean isAllCompleted() { + return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size(); + } + + /** + * 清理所有任务启动信息. + */ + public void clearAllCompletedInfo() { + jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.COMPLETED_ROOT); + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java index 9c4cec1a32..06b26faf8b 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java @@ -17,11 +17,10 @@ package com.dangdang.ddframe.job.internal.job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; - import com.dangdang.ddframe.job.api.ElasticJob; import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.exception.JobException; import com.dangdang.ddframe.job.internal.config.ConfigurationService; import com.dangdang.ddframe.job.internal.execution.ExecutionContextService; import com.dangdang.ddframe.job.internal.execution.ExecutionService; @@ -29,11 +28,15 @@ import com.dangdang.ddframe.job.internal.offset.OffsetService; import com.dangdang.ddframe.job.internal.schedule.JobRegistry; import com.dangdang.ddframe.job.internal.sharding.ShardingService; - import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import java.util.ArrayList; +import java.util.List; /** * 弹性化分布式作业的基类. @@ -67,6 +70,9 @@ public abstract class AbstractElasticJob implements ElasticJob { @Getter(AccessLevel.PROTECTED) private OffsetService offsetService; + @Setter + private List elasticJobListeners = new ArrayList<>(); + @Override public final void execute(final JobExecutionContext context) throws JobExecutionException { log.trace("Elastic job: job execute begin, job execution context:{}.", context); @@ -77,6 +83,15 @@ public final void execute(final JobExecutionContext context) throws JobExecution log.debug("Elastic job: previous job is still running, new job will start after previous job completed. Misfired job had recorded."); return; } + if (!elasticJobListeners.isEmpty()) { + for (ElasticJobListener each : elasticJobListeners) { + try { + each.beforeJobExecuted(shardingContext); + } catch (final JobException ex) { + handleJobExecutionException(new JobExecutionException(ex)); + } + } + } executeJobInternal(shardingContext); log.trace("Elastic job: execute normal completed, sharding context:{}.", shardingContext); while (configService.isMisfire() && !executionService.getMisfiredJobItems(shardingContext.getShardingItems()).isEmpty() && !stoped && !shardingService.isNeedSharding()) { @@ -88,6 +103,15 @@ public final void execute(final JobExecutionContext context) throws JobExecution if (configService.isFailover() && !stoped) { failoverService.failoverIfNecessary(); } + if (!elasticJobListeners.isEmpty()) { + for (ElasticJobListener each : elasticJobListeners) { + try { + each.afterJobExecuted(shardingContext); + } catch (final JobException ex) { + handleJobExecutionException(new JobExecutionException(ex)); + } + } + } log.trace("Elastic job: execute all completed, job execution context:{}.", context); } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/listener/ListenerManager.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/listener/ListenerManager.java index 0add3b37cd..81d61ab293 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/listener/ListenerManager.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/listener/ListenerManager.java @@ -18,14 +18,18 @@ package com.dangdang.ddframe.job.internal.listener; import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.internal.config.ConfigurationListenerManager; import com.dangdang.ddframe.job.internal.election.ElectionListenerManager; import com.dangdang.ddframe.job.internal.execution.ExecutionListenerManager; import com.dangdang.ddframe.job.internal.failover.FailoverListenerManager; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeListenerManager; import com.dangdang.ddframe.job.internal.server.JobOperationListenerManager; import com.dangdang.ddframe.job.internal.sharding.ShardingListenerManager; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; +import java.util.List; + /** * 作业注册中心的监听器管理者. * @@ -44,14 +48,17 @@ public class ListenerManager { private final JobOperationListenerManager jobOperationListenerManager; private final ConfigurationListenerManager configurationListenerManager; + + private final GuaranteeListenerManager guaranteeListenerManager; - public ListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) { + public ListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final List elasticJobListeners) { electionListenerManager = new ElectionListenerManager(coordinatorRegistryCenter, jobConfiguration); shardingListenerManager = new ShardingListenerManager(coordinatorRegistryCenter, jobConfiguration); executionListenerManager = new ExecutionListenerManager(coordinatorRegistryCenter, jobConfiguration); failoverListenerManager = new FailoverListenerManager(coordinatorRegistryCenter, jobConfiguration); jobOperationListenerManager = new JobOperationListenerManager(coordinatorRegistryCenter, jobConfiguration); configurationListenerManager = new ConfigurationListenerManager(coordinatorRegistryCenter, jobConfiguration); + guaranteeListenerManager = new GuaranteeListenerManager(coordinatorRegistryCenter, jobConfiguration, elasticJobListeners); } /** @@ -64,5 +71,6 @@ public void startAllListeners() { failoverListenerManager.start(); jobOperationListenerManager.start(); configurationListenerManager.start(); + guaranteeListenerManager.start(); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/AllApiTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/AllApiTests.java index 454bcb75ab..8261029738 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/AllApiTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/AllApiTests.java @@ -17,6 +17,7 @@ package com.dangdang.ddframe.job.api; +import com.dangdang.ddframe.job.api.listener.DistributeOnceElasticJobListenerTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; import org.junit.runners.Suite.SuiteClasses; @@ -24,7 +25,8 @@ @RunWith(Suite.class) @SuiteClasses({ JobExecutionMultipleShardingContextTest.class, - JobSchedulerTest.class + JobSchedulerTest.class, + DistributeOnceElasticJobListenerTest.class }) public final class AllApiTests { } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/JobSchedulerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/JobSchedulerTest.java index 772967ec18..6837b924f3 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/JobSchedulerTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/JobSchedulerTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -29,10 +30,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Date; import java.util.List; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -128,6 +132,18 @@ public void initMocks() throws NoSuchFieldException { ReflectionUtils.setFieldValue(jobScheduler, "monitorService", monitorService); } + @Test + public void assertScheduleWithElasticJobListeners() { + JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, jobConfig, new TestElasticJobListener(), new TestDistributeOnceElasticJobListener()); + List actual = ReflectionUtils.getFieldValue(jobScheduler, ReflectionUtils.getFieldWithName(JobScheduler.class, "elasticJobListeners", false)); + assertThat(actual.size(), is(2)); + assertThat(actual.get(0), instanceOf(TestElasticJobListener.class)); + assertThat(actual.get(1), instanceOf(TestDistributeOnceElasticJobListener.class)); + Field field = ReflectionUtils.getFieldWithName(TestDistributeOnceElasticJobListener.class, "guaranteeService", false); + field.setAccessible(true); + assertNotNull(field); + } + @Test public void assertInitIfIsMisfire() throws NoSuchFieldException, SchedulerException { when(configService.getCron()).thenReturn("* * 0/10 * * ? 2050"); @@ -373,4 +389,30 @@ public void assertSetField() throws NoSuchFieldException, SchedulerException { JobDetail jobDetail = ReflectionUtils.getFieldValue(jobScheduler, jobScheduler.getClass().getDeclaredField("jobDetail")); assertThat(jobDetail.getJobDataMap().get("fieldName").toString(), is("fieldValue")); } + + static class TestElasticJobListener implements ElasticJobListener { + + @Override + public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + } + + @Override + public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + } + } + + static class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { + + TestDistributeOnceElasticJobListener() { + super(500000L, 500000L); + } + + @Override + public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { + } + + @Override + public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { + } + } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/DistributeOnceElasticJobListenerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/DistributeOnceElasticJobListenerTest.java new file mode 100644 index 0000000000..fa40d7f176 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/DistributeOnceElasticJobListenerTest.java @@ -0,0 +1,115 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener; + +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.fixture.ElasticJobListenerCaller; +import com.dangdang.ddframe.job.api.listener.fixture.TestDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.exception.JobTimeoutException; +import com.dangdang.ddframe.job.internal.env.TimeService; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.unitils.util.ReflectionUtils; + +import java.util.Arrays; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public final class DistributeOnceElasticJobListenerTest { + + @Mock + private GuaranteeService guaranteeService; + + @Mock + private TimeService timeService; + + @Mock + private ElasticJobListenerCaller elasticJobListenerCaller; + + private JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext(); + + private TestDistributeOnceElasticJobListener distributeOnceElasticJobListener; + + @Before + public void setUp() throws NoSuchFieldException { + MockitoAnnotations.initMocks(this); + distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(elasticJobListenerCaller); + ReflectionUtils.setFieldValue(distributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false), guaranteeService); + ReflectionUtils.setFieldValue(distributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "timeService", false), timeService); + shardingContext.setShardingItems(Arrays.asList(0, 1)); + } + + @Test + public void testBeforeJobExecutedWhenIsAllStarted() { + when(guaranteeService.isAllStarted()).thenReturn(true); + distributeOnceElasticJobListener.beforeJobExecuted(shardingContext); + verify(guaranteeService).registerStart(Arrays.asList(0, 1)); + verify(elasticJobListenerCaller).call(); + verify(guaranteeService).clearAllStartedInfo(); + } + + @Test + public void testBeforeJobExecutedWhenIsNotAllStartedAndNotTimeout() { + when(guaranteeService.isAllStarted()).thenReturn(false); + when(timeService.getCurrentMillis()).thenReturn(0L); + distributeOnceElasticJobListener.beforeJobExecuted(shardingContext); + verify(guaranteeService).registerStart(Arrays.asList(0, 1)); + verify(guaranteeService, times(0)).clearAllStartedInfo(); + } + + @Test(expected = JobTimeoutException.class) + public void testBeforeJobExecutedWhenIsNotAllStartedAndTimeout() { + when(guaranteeService.isAllStarted()).thenReturn(false); + when(timeService.getCurrentMillis()).thenReturn(0L, 2L); + distributeOnceElasticJobListener.beforeJobExecuted(shardingContext); + verify(guaranteeService).registerStart(Arrays.asList(0, 1)); + verify(guaranteeService, times(0)).clearAllStartedInfo(); + } + + @Test + public void testAfterJobExecutedWhenIsAllCompleted() { + when(guaranteeService.isAllCompleted()).thenReturn(true); + distributeOnceElasticJobListener.afterJobExecuted(shardingContext); + verify(guaranteeService).registerComplete(Arrays.asList(0, 1)); + verify(elasticJobListenerCaller).call(); + verify(guaranteeService).clearAllCompletedInfo(); + } + + @Test + public void testAfterJobExecutedWhenIsAllCompletedAndNotTimeout() { + when(guaranteeService.isAllCompleted()).thenReturn(false); + when(timeService.getCurrentMillis()).thenReturn(0L); + distributeOnceElasticJobListener.afterJobExecuted(shardingContext); + verify(guaranteeService).registerComplete(Arrays.asList(0, 1)); + verify(guaranteeService, times(0)).clearAllCompletedInfo(); + } + + @Test(expected = JobTimeoutException.class) + public void testAfterJobExecutedWhenIsAllCompletedAndTimeout() { + when(guaranteeService.isAllCompleted()).thenReturn(false); + when(timeService.getCurrentMillis()).thenReturn(0L, 2L); + distributeOnceElasticJobListener.afterJobExecuted(shardingContext); + verify(guaranteeService).registerComplete(Arrays.asList(0, 1)); + verify(guaranteeService, times(0)).clearAllCompletedInfo(); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/ElasticJobListenerCaller.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/ElasticJobListenerCaller.java new file mode 100644 index 0000000000..f9b5153720 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/ElasticJobListenerCaller.java @@ -0,0 +1,23 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener.fixture; + +public interface ElasticJobListenerCaller { + + void call(); +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestDistributeOnceElasticJobListener.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestDistributeOnceElasticJobListener.java new file mode 100644 index 0000000000..f3242db706 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestDistributeOnceElasticJobListener.java @@ -0,0 +1,41 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener.fixture; + +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; + +public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { + + private final ElasticJobListenerCaller caller; + + public TestDistributeOnceElasticJobListener(final ElasticJobListenerCaller caller) { + super(1L, 1L); + this.caller = caller; + } + + @Override + public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { + caller.call(); + } + + @Override + public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { + caller.call(); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestElasticJobListener.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestElasticJobListener.java new file mode 100644 index 0000000000..406a6cf939 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/api/listener/fixture/TestElasticJobListener.java @@ -0,0 +1,39 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api.listener.fixture; + +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public final class TestElasticJobListener implements ElasticJobListener { + + private final ElasticJobListenerCaller caller; + + @Override + public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + caller.call(); + } + + @Override + public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + caller.call(); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/AllExceptionTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/AllExceptionTests.java index b41a76c9dc..3932fb49c5 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/AllExceptionTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/AllExceptionTests.java @@ -24,7 +24,8 @@ @RunWith(Suite.class) @SuiteClasses({ JobConflictExceptionTest.class, - TimeDiffIntolerableExceptionTest.class + TimeDiffIntolerableExceptionTest.class, + JobTimeoutExceptionTest.class }) public final class AllExceptionTests { } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/JobTimeoutExceptionTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/JobTimeoutExceptionTest.java new file mode 100644 index 0000000000..5e7577874a --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/exception/JobTimeoutExceptionTest.java @@ -0,0 +1,31 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.exception; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public final class JobTimeoutExceptionTest { + + @Test + public void assertGetMessage() { + assertThat(new JobTimeoutException(5000L).getMessage(), is("Job timeout. timeout mills is 5000.")); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobAutoInitTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobAutoInitTest.java index c61b635af5..1ca7128daf 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobAutoInitTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobAutoInitTest.java @@ -17,6 +17,7 @@ package com.dangdang.ddframe.job.integrate; +import org.junit.After; import org.junit.Before; import com.dangdang.ddframe.job.api.ElasticJob; @@ -37,4 +38,9 @@ public void autoJobInit() { initJob(); assertRegCenterCommonInfo(); } + + @After + public void assertAfterJobRun() { + assertRegCenterListenerInfo(); + } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java index 5dd253bcb7..1fc3c73288 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java @@ -17,20 +17,12 @@ package com.dangdang.ddframe.job.integrate; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.quartz.SchedulerException; -import org.unitils.util.ReflectionUtils; - import com.dangdang.ddframe.job.api.ElasticJob; import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; import com.dangdang.ddframe.job.api.JobScheduler; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.internal.election.LeaderElectionService; import com.dangdang.ddframe.job.internal.env.LocalHostService; import com.dangdang.ddframe.job.internal.schedule.JobRegistry; @@ -40,9 +32,18 @@ import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter; - import lombok.AccessLevel; import lombok.Getter; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.quartz.SchedulerException; +import org.unitils.util.ReflectionUtils; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public abstract class AbstractBaseStdJobTest extends AbstractNestedZookeeperBaseTest { @@ -70,7 +71,27 @@ public abstract class AbstractBaseStdJobTest extends AbstractNestedZookeeperBase protected AbstractBaseStdJobTest(final Class elasticJobClass, final boolean disabled) { jobConfig = new JobConfiguration(jobName, elasticJobClass, 3, "0/1 * * * * ?"); - jobScheduler = new JobScheduler(regCenter, jobConfig); + jobScheduler = new JobScheduler(regCenter, jobConfig, new ElasticJobListener() { + + @Override + public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + regCenter.persist("/" + jobName + "/listener/every", "test"); + } + + @Override + public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + } + }, new AbstractDistributeOnceElasticJobListener(500000L, 500000L) { + + @Override + public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { + regCenter.persist("/" + jobName + "/listener/once", "test"); + } + + @Override + public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { + } + }); this.disabled = disabled; monitorPort = -1; leaderElectionService = new LeaderElectionService(regCenter, jobConfig); @@ -131,4 +152,9 @@ protected void assertRegCenterCommonInfo() { regCenter.remove("/" + jobName + "/leader/election"); assertTrue(leaderElectionService.isLeader()); } + + protected void assertRegCenterListenerInfo() { + assertThat(regCenter.get("/" + jobName + "/listener/once"), is("test")); + assertThat(regCenter.get("/" + jobName + "/listener/every"), is("test")); + } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java index 06459a4ee9..66fd8d297c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java @@ -48,7 +48,7 @@ public void assertJobInit() { WaitingUtils.waitingShortTime(); } assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); - assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(10)); + assertTrue(ProcessCountStatistics.getProcessSuccessCount(getJobName()) >= 10); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/AllInternalTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/AllInternalTests.java index c047a3cd06..d0ccc89f2c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/AllInternalTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/AllInternalTests.java @@ -24,6 +24,7 @@ import com.dangdang.ddframe.job.internal.election.ElectionNodeTest; import com.dangdang.ddframe.job.internal.election.LeaderElectionServiceTest; import com.dangdang.ddframe.job.internal.env.LocalHostServiceTest; +import com.dangdang.ddframe.job.internal.env.TimeServiceTest; import com.dangdang.ddframe.job.internal.execution.ExecutionContextServiceTest; import com.dangdang.ddframe.job.internal.execution.ExecutionListenerManagerTest; import com.dangdang.ddframe.job.internal.execution.ExecutionNodeTest; @@ -31,6 +32,8 @@ import com.dangdang.ddframe.job.internal.failover.FailoverListenerManagerTest; import com.dangdang.ddframe.job.internal.failover.FailoverNodeTest; import com.dangdang.ddframe.job.internal.failover.FailoverServiceTest; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeNodeTest; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeServiceTest; import com.dangdang.ddframe.job.internal.listener.JobListenerTest; import com.dangdang.ddframe.job.internal.listener.ListenerManagerTest; import com.dangdang.ddframe.job.internal.monitor.MonitorServiceDisableTest; @@ -63,7 +66,8 @@ JobNodeStorageTest.class, ItemUtilsTest.class, SensitiveInfoUtilsTest.class, - LocalHostServiceTest.class, + LocalHostServiceTest.class, + TimeServiceTest.class, ConfigurationServiceTest.class, ConfigurationNodeTest.class, ConfigurationListenerManagerTest.class, @@ -94,7 +98,9 @@ ListenerManagerTest.class, JobListenerTest.class, MonitorServiceEnableTest.class, - MonitorServiceDisableTest.class + MonitorServiceDisableTest.class, + GuaranteeNodeTest.class, + GuaranteeServiceTest.class }) public final class AllInternalTests { } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/TimeServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/TimeServiceTest.java new file mode 100644 index 0000000000..9fcae95d0a --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/TimeServiceTest.java @@ -0,0 +1,32 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.env; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TimeServiceTest { + + private TimeService timeService = new TimeService(); + + @Test + public void testGetCurrentMillis() throws Exception { + assertTrue(timeService.getCurrentMillis() <= System.currentTimeMillis()); + } +} \ No newline at end of file diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManagerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManagerTest.java new file mode 100644 index 0000000000..b45fbeabe4 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeListenerManagerTest.java @@ -0,0 +1,101 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.fixture.TestJob; +import com.dangdang.ddframe.job.internal.listener.AbstractJobListener; +import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.unitils.util.ReflectionUtils; + +import java.util.Arrays; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public final class GuaranteeListenerManagerTest { + + @Mock + private JobNodeStorage jobNodeStorage; + + @Mock + private ElasticJobListener elasticJobListener; + + @Mock + private AbstractDistributeOnceElasticJobListener distributeOnceElasticJobListener; + + private GuaranteeListenerManager guaranteeListenerManager; + + @Before + public void setUp() throws NoSuchFieldException { + MockitoAnnotations.initMocks(this); + guaranteeListenerManager = new GuaranteeListenerManager(null, new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"), Arrays.asList(elasticJobListener, distributeOnceElasticJobListener)); + ReflectionUtils.setFieldValue(guaranteeListenerManager, guaranteeListenerManager.getClass().getSuperclass().getDeclaredField("jobNodeStorage"), jobNodeStorage); + } + + @Test + public void assertStart() { + guaranteeListenerManager.start(); + verify(jobNodeStorage, times(2)).addDataListener(Matchers.any()); + } + + @Test + public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() { + guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/guarantee/started", null, "".getBytes())), "/testJob/guarantee/started"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + } + + @Test + public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() { + guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/otherJob/guarantee/started", null, "".getBytes())), "/otherJob/guarantee/started"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + } + + @Test + public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() { + guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/guarantee/started", null, "".getBytes())), "/testJob/guarantee/started"); + verify(distributeOnceElasticJobListener).notifyWaitingTaskStart(); + } + + @Test + public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() { + guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/guarantee/completed", null, "".getBytes())), "/testJob/guarantee/completed"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + } + + @Test + public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() { + guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/otherJob/guarantee/completed", null, "".getBytes())), "/otherJob/guarantee/completed"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + } + + @Test + public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() { + guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/guarantee/completed", null, "".getBytes())), "/testJob/guarantee/completed"); + verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete(); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNodeTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNodeTest.java new file mode 100644 index 0000000000..46eb4e81e1 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeNodeTest.java @@ -0,0 +1,61 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import org.junit.Assert; +import org.junit.Test; + +import static junit.framework.TestCase.assertFalse; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public final class GuaranteeNodeTest { + + private GuaranteeNode guaranteeNode = new GuaranteeNode("testJob"); + + @Test + public void testGetStartedNode() { + assertThat(GuaranteeNode.getStartedNode(1), is("guarantee/started/1")); + } + + @Test + public void testGetCompletedNode() { + assertThat(GuaranteeNode.getCompletedNode(1), is("guarantee/completed/1")); + } + + @Test + public void testIsStartedRootNode() { + assertTrue(guaranteeNode.isStartedRootNode("/testJob/guarantee/started")); + } + + @Test + public void testIsNotStartedRootNode() { + assertFalse(guaranteeNode.isStartedRootNode("/otherJob/guarantee/started")); + } + + @Test + public void testIsCompletedRootNode() { + assertTrue(guaranteeNode.isCompletedRootNode("/testJob/guarantee/completed")); + } + + @Test + public void testIsNotCompletedRootNode() { + assertFalse(guaranteeNode.isCompletedRootNode("/otherJob/guarantee/completed")); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java new file mode 100644 index 0000000000..36def60524 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java @@ -0,0 +1,118 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.internal.guarantee; + +import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.fixture.TestJob; +import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.unitils.util.ReflectionUtils; + +import java.util.Arrays; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public final class GuaranteeServiceTest { + + @Mock + private JobNodeStorage jobNodeStorage; + + private final JobConfiguration jobConfig = new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"); + + private final GuaranteeService guaranteeService = new GuaranteeService(null, jobConfig); + + @Before + public void setUp() throws NoSuchFieldException { + MockitoAnnotations.initMocks(this); + ReflectionUtils.setFieldValue(guaranteeService, "jobNodeStorage", jobNodeStorage); + when(jobNodeStorage.getJobConfiguration()).thenReturn(jobConfig); + jobConfig.setOverwrite(true); + } + + @Test + public void testRegisterStart() { + guaranteeService.registerStart(Arrays.asList(0, 1)); + verify(jobNodeStorage).createJobNodeIfNeeded("guarantee/started/0"); + verify(jobNodeStorage).createJobNodeIfNeeded("guarantee/started/1"); + } + + @Test + public void testIsNotAllStartedWhenRootNodeIsNotExisted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(false); + assertFalse(guaranteeService.isAllStarted()); + } + + @Test + public void testIsNotAllStarted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true); + when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0", "1")); + assertFalse(guaranteeService.isAllStarted()); + } + + @Test + public void testIsAllStarted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true); + when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0", "1", "2")); + assertTrue(guaranteeService.isAllStarted()); + } + + @Test + public void testClearAllStartedInfo() { + guaranteeService.clearAllStartedInfo(); + verify(jobNodeStorage).removeJobNodeIfExisted("guarantee/started"); + } + + @Test + public void testRegisterComplete() { + guaranteeService.registerComplete(Arrays.asList(0, 1)); + verify(jobNodeStorage).createJobNodeIfNeeded("guarantee/completed/0"); + verify(jobNodeStorage).createJobNodeIfNeeded("guarantee/completed/1"); + } + + @Test + public void testIsNotAllCompletedWhenRootNodeIsNotExisted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(false); + assertFalse(guaranteeService.isAllCompleted()); + } + + @Test + public void testIsNotAllCompleted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true); + when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0", "1")); + assertFalse(guaranteeService.isAllCompleted()); + } + + @Test + public void testIsAllCompleted() { + when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true); + when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0", "1", "2")); + assertTrue(guaranteeService.isAllCompleted()); + } + + @Test + public void testClearAllCompletedInfo() { + guaranteeService.clearAllCompletedInfo(); + verify(jobNodeStorage).removeJobNodeIfExisted("guarantee/completed"); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/listener/ListenerManagerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/listener/ListenerManagerTest.java index 825cade460..933350da42 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/listener/ListenerManagerTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/listener/ListenerManagerTest.java @@ -17,22 +17,25 @@ package com.dangdang.ddframe.job.internal.listener; -import static org.mockito.Mockito.verify; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.unitils.util.ReflectionUtils; - import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.fixture.TestJob; import com.dangdang.ddframe.job.internal.config.ConfigurationListenerManager; import com.dangdang.ddframe.job.internal.election.ElectionListenerManager; import com.dangdang.ddframe.job.internal.execution.ExecutionListenerManager; import com.dangdang.ddframe.job.internal.failover.FailoverListenerManager; +import com.dangdang.ddframe.job.internal.guarantee.GuaranteeListenerManager; import com.dangdang.ddframe.job.internal.server.JobOperationListenerManager; import com.dangdang.ddframe.job.internal.sharding.ShardingListenerManager; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.unitils.util.ReflectionUtils; + +import java.util.Collections; + +import static org.mockito.Mockito.verify; public class ListenerManagerTest { @@ -54,7 +57,10 @@ public class ListenerManagerTest { @Mock private ConfigurationListenerManager configurationListenerManager; - private final ListenerManager listenerManager = new ListenerManager(null, new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?")); + @Mock + private GuaranteeListenerManager guaranteeListenerManager; + + private final ListenerManager listenerManager = new ListenerManager(null, new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"), Collections.emptyList()); @Before public void setUp() throws NoSuchFieldException { @@ -65,6 +71,7 @@ public void setUp() throws NoSuchFieldException { ReflectionUtils.setFieldValue(listenerManager, "failoverListenerManager", failoverListenerManager); ReflectionUtils.setFieldValue(listenerManager, "jobOperationListenerManager", jobOperationListenerManager); ReflectionUtils.setFieldValue(listenerManager, "configurationListenerManager", configurationListenerManager); + ReflectionUtils.setFieldValue(listenerManager, "guaranteeListenerManager", guaranteeListenerManager); } @Test @@ -76,5 +83,6 @@ public void assertStartAllListeners() { verify(failoverListenerManager).start(); verify(jobOperationListenerManager).start(); verify(configurationListenerManager).start(); + verify(guaranteeListenerManager).start(); } } diff --git a/elastic-job-doc/content/post/user_guide.md b/elastic-job-doc/content/post/user_guide.md index 811ec53610..a70f2d6c3c 100644 --- a/elastic-job-doc/content/post/user_guide.md +++ b/elastic-job-doc/content/post/user_guide.md @@ -118,6 +118,87 @@ public class XXXSimpleJob extends AbstractSimpleElasticJob { ``` +### 任务监听配置 +可以通过配置多个任务监听器,在任务执行前和执行后执行监听的方法。监听器分为每台作业节点均执行和分布式场景中仅单一节点执行两种。 + +#### 每台作业节点均执行的监听 +若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。 + +步骤: + +1. 定义监听器 + +```java +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.ElasticJobListener; + +public class MyElasticJobListener implements AbstractDistributeOnceElasticJobListener { + + @Override + public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + // do something ... + } + + @Override + public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { + // do something ... + } +} +``` + +2. 将监听器作为参数传入`JobScheduler` + +```java +public class JobMain { + + public static void main(final String[] args) { + new JobScheduler(regCenter, jobConfig, new MyElasticJobListener()).init(); + } +} +``` + +#### 分布式场景中仅单一节点执行的监听 +若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。 + +步骤: + +1. 定义监听器 + +```java +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; + +public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { + + public MyDistributeOnceElasticJobListener(final long startTimeoutMills, final long completeTimeoutMills) { + super(startTimeoutMills, completeTimeoutMills); + } + + @Override + public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { + // do something ... + } + + @Override + public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { + // do something ... + } +} +``` + +2. 将监听器作为参数传入`JobScheduler` + +```java +public class JobMain { + + public static void main(final String[] args) { + long startTimeoutMills = 5000L; + long completeTimeoutMills = 10000L; + new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init(); + } +} +``` + ## 作业配置 与`Spring`容器配合使用作业,可以将作业`Bean`配置为`Spring Bean`,可在作业中通过依赖注入使用`Spring`容器管理的数据源等对象。可用`placeholder`占位符从属性文件中取值。 diff --git a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/job/SequenceDataFlowJobDemo.java b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/job/SequenceDataFlowJobDemo.java index a7f2dfa405..14400f2135 100644 --- a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/job/SequenceDataFlowJobDemo.java +++ b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/job/SequenceDataFlowJobDemo.java @@ -17,16 +17,15 @@ package com.dangdang.example.elasticjob.core.job; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext; import com.dangdang.ddframe.job.plugin.job.type.dataflow.AbstractIndividualSequenceDataFlowElasticJob; import com.dangdang.example.elasticjob.fixture.entity.Foo; import com.dangdang.example.elasticjob.fixture.repository.FooRepository; import com.dangdang.example.elasticjob.utils.PrintContext; +import java.util.Collections; +import java.util.List; + public class SequenceDataFlowJobDemo extends AbstractIndividualSequenceDataFlowElasticJob { private PrintContext printContext = new PrintContext(SequenceDataFlowJobDemo.class); diff --git a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java index 5e18efbe90..b8e010f777 100644 --- a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java +++ b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java @@ -18,7 +18,9 @@ package com.dangdang.example.elasticjob.core.main; import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; import com.dangdang.ddframe.job.api.JobScheduler; +import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter; @@ -32,7 +34,7 @@ public final class JobMain { private final CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); - private final JobConfiguration jobConfig1 = new JobConfiguration("simpleElasticDemoJob", SimpleJobDemo.class, 10, "0/5 * * * * ?"); + private final JobConfiguration jobConfig1 = new JobConfiguration("simpleElasticDemoJob", SimpleJobDemo.class, 10, "0/30 * * * * ?"); private final JobConfiguration jobConfig2 = new JobConfiguration("throughputDataFlowElasticDemoJob", ThroughputDataFlowJobDemo.class, 10, "0/5 * * * * ?"); @@ -48,8 +50,25 @@ public void init() { zkConfig.setNestedPort(4181); zkConfig.setNestedDataDir(String.format("target/test_zk_data/%s/", System.nanoTime())); regCenter.init(); - new JobScheduler(regCenter, jobConfig1).init(); + new JobScheduler(regCenter, jobConfig1, new SimpleDistributeOnceElasticJobListener()).init(); new JobScheduler(regCenter, jobConfig2).init(); new JobScheduler(regCenter, jobConfig3).init(); } + + class SimpleDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { + + public SimpleDistributeOnceElasticJobListener() { + super(1000L, 1000L); + } + + @Override + public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { + System.out.println("------ before simple job start ------"); + } + + @Override + public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { + System.out.println("------ after simple job start ------"); + } + } } diff --git a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/spring/job/SequenceDataFlowJobDemo.java b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/spring/job/SequenceDataFlowJobDemo.java index e1a6696d15..2020593a35 100644 --- a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/spring/job/SequenceDataFlowJobDemo.java +++ b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/spring/job/SequenceDataFlowJobDemo.java @@ -17,19 +17,16 @@ package com.dangdang.example.elasticjob.spring.job; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import javax.annotation.Resource; - -import org.springframework.stereotype.Component; - import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext; import com.dangdang.ddframe.job.plugin.job.type.dataflow.AbstractBatchSequenceDataFlowElasticJob; import com.dangdang.example.elasticjob.fixture.entity.Foo; import com.dangdang.example.elasticjob.fixture.repository.FooRepository; import com.dangdang.example.elasticjob.utils.PrintContext; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Collections; +import java.util.List; @Component public class SequenceDataFlowJobDemo extends AbstractBatchSequenceDataFlowElasticJob {