diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java index 5737c2447c..4008658105 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobTimeoutException.java @@ -23,7 +23,9 @@ * @author zhangliang */ public final class JobTimeoutException extends JobException { - + + private static final long serialVersionUID = 315323919916960589L; + /** * 作业超时抛出的异常. * diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java index a3ddc9f3bb..e6a6df7ae0 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeService.java @@ -18,6 +18,7 @@ package com.dangdang.ddframe.job.internal.guarantee; import com.dangdang.ddframe.job.api.JobConfiguration; +import com.dangdang.ddframe.job.internal.config.ConfigurationService; import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; @@ -30,13 +31,13 @@ */ public class GuaranteeService { - private final JobConfiguration jobConfiguration; - private final JobNodeStorage jobNodeStorage; + private final ConfigurationService configService; + public GuaranteeService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) { - this.jobConfiguration = jobConfiguration; jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration); + configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration); } /** @@ -56,7 +57,7 @@ public void registerStart(final Collection shardingItems) { * @return 是否所有的任务均启动完毕 */ public boolean isAllStarted() { - return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size(); + return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && configService.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size(); } /** @@ -83,7 +84,7 @@ public void registerComplete(final Collection shardingItems) { * @return 是否所有的任务均执行完毕 */ public boolean isAllCompleted() { - return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size(); + return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT) && configService.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size(); } /** diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java index e09406b9df..9c6c6936d7 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java @@ -22,6 +22,8 @@ import com.dangdang.ddframe.job.internal.election.LeaderElectionService; import com.dangdang.ddframe.job.internal.env.LocalHostService; import com.dangdang.ddframe.job.internal.execution.ExecutionService; +import com.dangdang.ddframe.job.internal.reg.BlockUtils; +import com.dangdang.ddframe.job.internal.reg.ItemUtils; import com.dangdang.ddframe.job.internal.server.ServerService; import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy; import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory; @@ -29,8 +31,6 @@ import com.dangdang.ddframe.job.internal.storage.JobNodePath; import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; import com.dangdang.ddframe.job.internal.storage.TransactionExecutionCallback; -import com.dangdang.ddframe.job.internal.reg.BlockUtils; -import com.dangdang.ddframe.job.internal.reg.ItemUtils; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java index 84008e5611..6d66a3bc58 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java @@ -103,7 +103,7 @@ public List getAclForPath(final String path) { client = builder.build(); client.start(); try { - client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds(), TimeUnit.MILLISECONDS); + client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS); if (!client.getZookeeperClient().isConnected()) { throw new KeeperException.OperationTimeoutException(); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java index 57981870a4..e7742a5c43 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/guarantee/GuaranteeServiceTest.java @@ -19,6 +19,7 @@ import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.fixture.TestJob; +import com.dangdang.ddframe.job.internal.config.ConfigurationService; import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; import org.junit.Before; import org.junit.Test; @@ -38,6 +39,9 @@ public final class GuaranteeServiceTest { @Mock private JobNodeStorage jobNodeStorage; + @Mock + private ConfigurationService configService; + private final JobConfiguration jobConfig = new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"); private final GuaranteeService guaranteeService = new GuaranteeService(null, jobConfig); @@ -46,6 +50,7 @@ public final class GuaranteeServiceTest { public void setUp() throws NoSuchFieldException { MockitoAnnotations.initMocks(this); ReflectionUtils.setFieldValue(guaranteeService, "jobNodeStorage", jobNodeStorage); + ReflectionUtils.setFieldValue(guaranteeService, "configService", configService); when(jobNodeStorage.getJobConfiguration()).thenReturn(jobConfig); jobConfig.setOverwrite(true); } @@ -73,6 +78,7 @@ public void testIsNotAllStarted() { @Test public void testIsAllStarted() { when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true); + when(configService.getShardingTotalCount()).thenReturn(3); when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0", "1", "2")); assertTrue(guaranteeService.isAllStarted()); } @@ -106,6 +112,7 @@ public void testIsNotAllCompleted() { @Test public void testIsAllCompleted() { when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true); + when(configService.getShardingTotalCount()).thenReturn(3); when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0", "1", "2")); assertTrue(guaranteeService.isAllCompleted()); } diff --git a/elastic-job-doc/content/post/release_notes.md b/elastic-job-doc/content/post/release_notes.md index 26a18b0af9..5b6799d818 100644 --- a/elastic-job-doc/content/post/release_notes.md +++ b/elastic-job-doc/content/post/release_notes.md @@ -19,8 +19,10 @@ weight=1 1. [ISSUE #84](https://github.com/dangdangdotcom/elastic-job/issues/84) 控制台提供作业启用/禁用按钮操作 1. [ISSUE #87](https://github.com/dangdangdotcom/elastic-job/issues/87) 调整主节点选举流程,作业关闭,禁用和暂停将触发主节点选举 +1. [ISSUE #93](https://github.com/dangdangdotcom/elastic-job/issues/93) 注册中心配置提供baseSleepTimeMilliseconds、maxSleepTimeMilliseconds和maxRetries的默认值 - +### 缺陷修正 +1. [ISSUE #92](https://github.com/dangdangdotcom/elastic-job/issues/92) 修改分片总数参数导致仅单一节点执行的监听抛出超时异常 ## 1.0.6 diff --git a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/fixture/repository/FooRepository.java b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/fixture/repository/FooRepository.java index 9a2ad31f50..3f8fc79a01 100644 --- a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/fixture/repository/FooRepository.java +++ b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/fixture/repository/FooRepository.java @@ -53,7 +53,7 @@ public List findActive(final List shardingItems) { private List findActive(final int shardingItem) { List result = new ArrayList<>(10); for (int i = 0; i < 10; i++) { - Foo foo = map.get((long) (shardingItem * 10 + i)); + Foo foo = map.get((shardingItem * 10 + i) % 100L); if (FooStatus.ACTIVE == foo.getStatus()) { result.add(foo); }