Skip to content

Commit

Permalink
fix #246
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Mar 21, 2017
1 parent 686ce2f commit 1b2b4b9
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public interface CoordinatorRegistryCenter extends RegistryCenter {
*/
void addCacheData(String cachePath);

/**
* 释放本地缓存.
*
* @param cachePath 需释放缓存的路径
*/
void evictCacheData(String cachePath);

/**
* 获取注册中心数据缓存对象.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ public void addCacheData(final String cachePath) {
caches.put(cachePath + "/", cache);
}

@Override
public void evictCacheData(final String cachePath) {
TreeCache cache = caches.remove(cachePath + "/");
if (null != cache) {
cache.close();
}
}

@Override
public Object getRawCache(final String cachePath) {
return caches.get(cachePath + "/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ private Properties getBaseQuartzProperties(final boolean isMisfire) {
* 停止作业调度.
*/
public void shutdown() {
jobRegistry.getJobScheduleController(jobName).shutdown();
jobRegistry.removeJobScheduleController(jobName).shutdown();
jobExecutor.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,12 @@ public void init() {
regCenter.addCacheData("/" + liteJobConfig.getJobName());
schedulerFacade.registerStartUpInfo(liteJobConfig);
}

/**
* 关闭作业.
*/
public void close() {
log.debug("Job '{}' controller close.", liteJobConfig.getJobName());
regCenter.evictCacheData("/" + liteJobConfig.getJobName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ public JobScheduleController getJobScheduleController(final String jobName) {
return schedulerMap.get(jobName);
}

/**
* 删除作业调度控制器.
*
* @param jobName 作业名称
* @return 删除的作业调度控制器
*/
public JobScheduleController removeJobScheduleController(final String jobName) {
return schedulerMap.remove(jobName);
}

/**
* 添加作业实例主键.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ class PersistShardingInfoTransactionExecutionCallback implements TransactionExec
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (JobShardingResult each : shardingResults) {
if (!each.getShardingItems().isEmpty()) {
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getShardingNode(each.getJobShardingUnit().getServerIp(), each.getJobShardingUnit().getJobInstanceId())),
curatorTransactionFinal.create().forPath(
jobNodePath.getFullPath(ShardingNode.getShardingNode(each.getJobShardingUnit().getServerIp(), each.getJobShardingUnit().getJobInstanceId())),
ShardingItems.toItemsString(each.getShardingItems()).getBytes()).and();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.quartz.SchedulerException;
import org.unitils.util.ReflectionUtils;

import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -59,18 +58,24 @@ public void initMocks() throws NoSuchFieldException {
}

@Test
public void assertNew() throws NoSuchFieldException {
public void assertNew() {
TestDistributeOnceElasticJobListener testDistributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(caller);
assertNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
new JobExecutor(null, liteJobConfig, new TestElasticJobListener(caller), testDistributeOnceElasticJobListener);
assertNotNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
}

@Test
public void assertInit() throws NoSuchFieldException, SchedulerException {
public void assertInit() {
jobExecutor.init();
verify(schedulerFacade).clearPreviousServerStatus();
verify(regCenter).addCacheData("/test_job");
verify(schedulerFacade).registerStartUpInfo(liteJobConfig);
}

@Test
public void assertClose() {
jobExecutor.close();
verify(regCenter).evictCacheData("/test_job");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

Expand All @@ -28,7 +29,15 @@ public final class JobRegistryTest {
@Test
public void assertAddJobScheduler() {
JobScheduleController jobScheduleController = mock(JobScheduleController.class);
JobRegistry.getInstance().addJobScheduleController("test_job_AddJobScheduler", jobScheduleController);
assertThat(JobRegistry.getInstance().getJobScheduleController("test_job_AddJobScheduler"), is(jobScheduleController));
JobRegistry.getInstance().addJobScheduleController("test_job_scheduler_for_add", jobScheduleController);
assertThat(JobRegistry.getInstance().getJobScheduleController("test_job_scheduler_for_add"), is(jobScheduleController));
}

@Test
public void assertRemoveJobScheduleController() {
JobScheduleController jobScheduleController = mock(JobScheduleController.class);
JobRegistry.getInstance().addJobScheduleController("test_job_scheduler_for_remove", jobScheduleController);
assertThat(JobRegistry.getInstance().removeJobScheduleController("test_job_scheduler_for_remove"), is(jobScheduleController));
assertNull(JobRegistry.getInstance().getJobScheduleController("test_job_scheduler_for_add"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public Collection<String> remove(final Optional<String> jobName, final Optional<
@Override
public boolean doOperate(final String jobName, final String serverIp) {
JobNodePath jobNodePath = new JobNodePath(jobName);
if (regCenter.isExisted(jobNodePath.getServerNodePath(serverIp, JobNodePath.STATUS_NODE))
|| regCenter.isExisted(jobNodePath.getLeaderHostNodePath())) {
if (regCenter.isExisted(jobNodePath.getServerNodePath(serverIp, JobNodePath.STATUS_NODE)) || regCenter.isExisted(jobNodePath.getLeaderHostNodePath())) {
return false;
}
regCenter.remove(jobNodePath.getServerNodePath(serverIp));
Expand Down

0 comments on commit 1b2b4b9

Please sign in to comment.