Skip to content

Commit

Permalink
fix #92
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 19, 2016
1 parent 31e2359 commit 9c66ec4
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
* @author zhangliang
*/
public final class JobTimeoutException extends JobException {


private static final long serialVersionUID = 315323919916960589L;

/**
* 作业超时抛出的异常.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dangdang.ddframe.job.internal.guarantee;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

Expand All @@ -30,13 +31,13 @@
*/
public class GuaranteeService {

private final JobConfiguration jobConfiguration;

private final JobNodeStorage jobNodeStorage;

private final ConfigurationService configService;

public GuaranteeService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
}

/**
Expand All @@ -56,7 +57,7 @@ public void registerStart(final Collection<Integer> shardingItems) {
* @return 是否所有的任务均启动完毕
*/
public boolean isAllStarted() {
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && configService.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
}

/**
Expand All @@ -83,7 +84,7 @@ public void registerComplete(final Collection<Integer> shardingItems) {
* @return 是否所有的任务均执行完毕
*/
public boolean isAllCompleted() {
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT) && jobConfiguration.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size();
return jobNodeStorage.isJobNodeExisted(GuaranteeNode.COMPLETED_ROOT) && configService.getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.COMPLETED_ROOT).size();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import com.dangdang.ddframe.job.internal.election.LeaderElectionService;
import com.dangdang.ddframe.job.internal.env.LocalHostService;
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.reg.BlockUtils;
import com.dangdang.ddframe.job.internal.reg.ItemUtils;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption;
import com.dangdang.ddframe.job.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.job.internal.storage.TransactionExecutionCallback;
import com.dangdang.ddframe.job.internal.reg.BlockUtils;
import com.dangdang.ddframe.job.internal.reg.ItemUtils;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public List<ACL> getAclForPath(final String path) {
client = builder.build();
client.start();
try {
client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds(), TimeUnit.MILLISECONDS);
client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS);
if (!client.getZookeeperClient().isConnected()) {
throw new KeeperException.OperationTimeoutException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,6 +39,9 @@ public final class GuaranteeServiceTest {
@Mock
private JobNodeStorage jobNodeStorage;

@Mock
private ConfigurationService configService;

private final JobConfiguration jobConfig = new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?");

private final GuaranteeService guaranteeService = new GuaranteeService(null, jobConfig);
Expand All @@ -46,6 +50,7 @@ public final class GuaranteeServiceTest {
public void setUp() throws NoSuchFieldException {
MockitoAnnotations.initMocks(this);
ReflectionUtils.setFieldValue(guaranteeService, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(guaranteeService, "configService", configService);
when(jobNodeStorage.getJobConfiguration()).thenReturn(jobConfig);
jobConfig.setOverwrite(true);
}
Expand Down Expand Up @@ -73,6 +78,7 @@ public void testIsNotAllStarted() {
@Test
public void testIsAllStarted() {
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(true);
when(configService.getShardingTotalCount()).thenReturn(3);
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/started")).thenReturn(Arrays.asList("0", "1", "2"));
assertTrue(guaranteeService.isAllStarted());
}
Expand Down Expand Up @@ -106,6 +112,7 @@ public void testIsNotAllCompleted() {
@Test
public void testIsAllCompleted() {
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(true);
when(configService.getShardingTotalCount()).thenReturn(3);
when(jobNodeStorage.getJobNodeChildrenKeys("guarantee/completed")).thenReturn(Arrays.asList("0", "1", "2"));
assertTrue(guaranteeService.isAllCompleted());
}
Expand Down
4 changes: 3 additions & 1 deletion elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ weight=1

1. [ISSUE #84](https://github.com/dangdangdotcom/elastic-job/issues/84) 控制台提供作业启用/禁用按钮操作
1. [ISSUE #87](https://github.com/dangdangdotcom/elastic-job/issues/87) 调整主节点选举流程,作业关闭,禁用和暂停将触发主节点选举
1. [ISSUE #93](https://github.com/dangdangdotcom/elastic-job/issues/93) 注册中心配置提供baseSleepTimeMilliseconds、maxSleepTimeMilliseconds和maxRetries的默认值


### 缺陷修正
1. [ISSUE #92](https://github.com/dangdangdotcom/elastic-job/issues/92) 修改分片总数参数导致仅单一节点执行的监听抛出超时异常

## 1.0.6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public List<Foo> findActive(final List<Integer> shardingItems) {
private List<Foo> findActive(final int shardingItem) {
List<Foo> result = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
Foo foo = map.get((long) (shardingItem * 10 + i));
Foo foo = map.get((shardingItem * 10 + i) % 100L);
if (FooStatus.ACTIVE == foo.getStatus()) {
result.add(foo);
}
Expand Down

0 comments on commit 9c66ec4

Please sign in to comment.