From e4a7466c40ad3932ed71235b4a659bd2d8ea43c9 Mon Sep 17 00:00:00 2001
From: terrymanu
* 可以根据不同的端口号启动多个Zookeeper服务. * 但每个相同的端口号共用一个服务实例. *
* - *- * 整个测试结束后, 随着JVM的关闭而关闭内嵌Zookeeper服务器. - * 之所以不调用close方法关闭Zookeeper容器并清理资源, 原因是Curator连接时会不停的扫描Zookeeper, 如果Zookeeper先于Curator关闭, Curator会不停的重连Zookeeper容器, 导致测试用例不能继续进行. - * 所以只能采用这种方式关闭, 目前已知的问题是: 测试的文件夹test_zk_data不能在测试结束后自动删除. - *
- * * @author zhangliang */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public final class NestedZookeeperServers { - /** - * 内嵌Zookeeper的连接地址. - */ - public static final String ZK_CONNECTION_STRING = String.format("localhost:%s", NestedZookeeperServers.DEFAULT_PORT); - - private static final int DEFAULT_PORT = 3181; - - private static final String TEST_TEMP_DIRECTORY = String.format("target/test_zk_data/%s/", System.nanoTime()); - private static NestedZookeeperServers instance = new NestedZookeeperServers(); - private static ConcurrentMap* 如果该端口号的Zookeeper服务未启动, 则启动服务. * 如果该端口号的Zookeeper服务已启动, 则不做任何操作. *
*/ - public void startServerIfNotStarted() { - startServerIfNotStarted(DEFAULT_PORT); - } - - private synchronized void startServerIfNotStarted(final int port) { - if (!testingServers.containsKey(port)) { - TestingServer testingServer; + public synchronized void startServerIfNotStarted(final int port, final String dataDir) { + if (!nestedServers.containsKey(port)) { + TestingServer testingServer = null; try { - testingServer = new TestingServer(port, new File(TEST_TEMP_DIRECTORY + port)); + testingServer = new TestingServer(port, new File(dataDir)); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON - throw new TestEnvironmentException(ex); + RegExceptionHandler.handleException(ex); } - testingServers.putIfAbsent(port, testingServer); + nestedServers.putIfAbsent(port, testingServer); } } /** - * 关闭内嵌的端口号为3181的Zookeeper服务. + * 关闭内嵌的Zookeeper服务. + * + * @param port 端口号 */ - public void closeServer() { + public void closeServer(final int port) { + TestingServer nestedServer = nestedServers.get(port); + if (null == nestedServer) { + return; + } try { - testingServers.get(DEFAULT_PORT).close(); + nestedServer.close(); + nestedServers.remove(port); } catch (final IOException ex) { - throw new TestEnvironmentException(ex); + RegExceptionHandler.handleException(ex); } } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperConfiguration.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperConfiguration.java index fec05b8590..6596b010c1 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperConfiguration.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperConfiguration.java @@ -22,6 +22,7 @@ import lombok.Setter; import com.dangdang.ddframe.reg.base.AbstractRegistryCenterConfiguration; +import com.google.common.base.Strings; /** * 基于Zookeeper的注册中心配置. @@ -81,6 +82,17 @@ public class ZookeeperConfiguration extends AbstractRegistryCenterConfiguration */ private String digest; + /** + * 内嵌Zookeeper的端口号. + * -1表示不开启内嵌Zookeeper. + */ + private int nestedPort = -1; + + /** + * 内嵌Zookeeper的数据存储路径. + */ + private String nestedDataDir; + /** * 包含了必需属性的构造器. * @@ -97,4 +109,13 @@ public ZookeeperConfiguration(final String serverLists, final String namespace, this.maxSleepTimeMilliseconds = maxSleepTimeMilliseconds; this.maxRetries = maxRetries; } + + /** + * 判断是否需要开启内嵌Zookeeper. + * + * @return 是否需要开启内嵌Zookeeper + */ + public boolean isUseNestedZookeeper() { + return -1 != nestedPort && !Strings.isNullOrEmpty(nestedDataDir); + } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java index 053f527630..2bba3472ce 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenter.java @@ -70,6 +70,9 @@ public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) { } public void init() { + if (zkConfig.isUseNestedZookeeper()) { + NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir()); + } log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) @@ -143,6 +146,9 @@ public void close() { } waitForCacheClose(); CloseableUtils.closeQuietly(client); + if (zkConfig.isUseNestedZookeeper()) { + NestedZookeeperServers.getInstance().closeServer(zkConfig.getNestedPort()); + } } /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常 diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/AllTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/AllTests.java index 6b440e6604..09853c4723 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/AllTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/AllTests.java @@ -23,8 +23,9 @@ import org.junit.runners.Suite.SuiteClasses; import com.dangdang.ddframe.job.AllJobTests; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; import com.dangdang.ddframe.reg.AllRegTests; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.zookeeper.NestedZookeeperServers; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -39,6 +40,6 @@ public final class AllTests { @AfterClass public static void clear() { - NestedZookeeperServers.getInstance().closeServer(); + NestedZookeeperServers.getInstance().closeServer(AbstractNestedZookeeperBaseTest.PORT); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java index c6b45f5193..5dd253bcb7 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java @@ -36,18 +36,20 @@ import com.dangdang.ddframe.job.internal.schedule.JobRegistry; import com.dangdang.ddframe.job.internal.server.ServerStatus; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter; -import com.dangdang.ddframe.test.NestedZookeeperServers; import lombok.AccessLevel; import lombok.Getter; -public abstract class AbstractBaseStdJobTest { +public abstract class AbstractBaseStdJobTest extends AbstractNestedZookeeperBaseTest { - protected static final CoordinatorRegistryCenter REG_CENTER = new ZookeeperRegistryCenter( - new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, "zkRegTestCenter", 1000, 3000, 3)); + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, "zkRegTestCenter", 1000, 3000, 3); + + @Getter(value = AccessLevel.PROTECTED) + private static CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); @Getter(AccessLevel.PROTECTED) private final LocalHostService localHostService = new LocalHostService(); @@ -68,24 +70,25 @@ public abstract class AbstractBaseStdJobTest { protected AbstractBaseStdJobTest(final Class extends ElasticJob> elasticJobClass, final boolean disabled) { jobConfig = new JobConfiguration(jobName, elasticJobClass, 3, "0/1 * * * * ?"); - jobScheduler = new JobScheduler(REG_CENTER, jobConfig); + jobScheduler = new JobScheduler(regCenter, jobConfig); this.disabled = disabled; monitorPort = -1; - leaderElectionService = new LeaderElectionService(REG_CENTER, jobConfig); + leaderElectionService = new LeaderElectionService(regCenter, jobConfig); } protected AbstractBaseStdJobTest(final Class extends ElasticJob> elasticJobClass, final int monitorPort) { jobConfig = new JobConfiguration(jobName, elasticJobClass, 3, "0/1 * * * * ?"); - jobScheduler = new JobScheduler(REG_CENTER, jobConfig); + jobScheduler = new JobScheduler(regCenter, jobConfig); disabled = false; this.monitorPort = monitorPort; - leaderElectionService = new LeaderElectionService(REG_CENTER, jobConfig); + leaderElectionService = new LeaderElectionService(regCenter, jobConfig); } @BeforeClass public static void init() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); - REG_CENTER.init(); + zkConfig.setNestedPort(PORT); + zkConfig.setNestedDataDir(TEST_TEMP_DIRECTORY); + regCenter.init(); } @Before @@ -95,7 +98,7 @@ public void setUp() { jobConfig.setDisabled(disabled); jobConfig.setMonitorPort(monitorPort); jobConfig.setOverwrite(true); - REG_CENTER.init(); + regCenter.init(); } @After @@ -113,19 +116,19 @@ protected void initJob() { } protected void assertRegCenterCommonInfo() { - assertThat(REG_CENTER.get("/" + jobName + "/leader/election/host"), is(localHostService.getIp())); - assertThat(REG_CENTER.get("/" + jobName + "/config/shardingTotalCount"), is("3")); - assertThat(REG_CENTER.get("/" + jobName + "/config/shardingItemParameters"), is("0=A,1=B,2=C")); - assertThat(REG_CENTER.get("/" + jobName + "/config/cron"), is("0/1 * * * * ?")); - assertThat(REG_CENTER.get("/" + jobName + "/servers/" + localHostService.getIp() + "/hostName"), is(localHostService.getHostName())); + assertThat(regCenter.get("/" + jobName + "/leader/election/host"), is(localHostService.getIp())); + assertThat(regCenter.get("/" + jobName + "/config/shardingTotalCount"), is("3")); + assertThat(regCenter.get("/" + jobName + "/config/shardingItemParameters"), is("0=A,1=B,2=C")); + assertThat(regCenter.get("/" + jobName + "/config/cron"), is("0/1 * * * * ?")); + assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp() + "/hostName"), is(localHostService.getHostName())); if (disabled) { - assertTrue(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled")); + assertTrue(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled")); } else { - assertFalse(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled")); + assertFalse(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled")); } - assertFalse(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/stoped")); - assertThat(REG_CENTER.get("/" + jobName + "/servers/" + localHostService.getIp() + "/status"), is(ServerStatus.READY.name())); - REG_CENTER.remove("/" + jobName + "/leader/election"); + assertFalse(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/stoped")); + assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp() + "/status"), is(ServerStatus.READY.name())); + regCenter.remove("/" + jobName + "/leader/election"); assertTrue(leaderElectionService.isLeader()); } } diff --git a/elastic-job-test/src/main/java/com/dangdang/ddframe/test/WaitingUtils.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/WaitingUtils.java similarity index 86% rename from elastic-job-test/src/main/java/com/dangdang/ddframe/test/WaitingUtils.java rename to elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/WaitingUtils.java index 06a030ef46..9899db8e1f 100644 --- a/elastic-job-test/src/main/java/com/dangdang/ddframe/test/WaitingUtils.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/WaitingUtils.java @@ -15,21 +15,18 @@ * */ -package com.dangdang.ddframe.test; +package com.dangdang.ddframe.job.integrate; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) public final class WaitingUtils { - private WaitingUtils() { - } - public static void waitingShortTime() { sleep(300L); } - public static void waitingLongTime() { - sleep(1500L); - } - private static void sleep(final long millis) { try { Thread.sleep(millis); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/OneOffSequenceDataFlowElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/OneOffSequenceDataFlowElasticJobTest.java index 6033a7b443..efce4e60eb 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/OneOffSequenceDataFlowElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/OneOffSequenceDataFlowElasticJobTest.java @@ -27,9 +27,9 @@ import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.sequence.OneOffSequenceDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class OneOffSequenceDataFlowElasticJobTest extends AbstractBaseStdJobAutoInitTest { @@ -53,7 +53,7 @@ public void assertJobInit() { while (!OneOffSequenceDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(30)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/StreamingSequenceDataFlowElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/StreamingSequenceDataFlowElasticJobTest.java index 8384e7bfd2..ae7cb11d04 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/StreamingSequenceDataFlowElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/sequence/StreamingSequenceDataFlowElasticJobTest.java @@ -26,9 +26,9 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.sequence.StreamingSequenceDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingSequenceDataFlowElasticJobTest extends AbstractBaseStdJobAutoInitTest { @@ -47,7 +47,7 @@ public void assertJobInit() { while (!StreamingSequenceDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(30)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/OneOffThroughputDataFlowElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/OneOffThroughputDataFlowElasticJobTest.java index ac7156605a..81fabd34e4 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/OneOffThroughputDataFlowElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/OneOffThroughputDataFlowElasticJobTest.java @@ -27,9 +27,9 @@ import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.OneOffThroughputDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class OneOffThroughputDataFlowElasticJobTest extends AbstractBaseStdJobAutoInitTest { @@ -53,7 +53,7 @@ public void assertJobInit() { while (!OneOffThroughputDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(10)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteFailureTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteFailureTest.java index cff8f440ba..ab21e4d711 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteFailureTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteFailureTest.java @@ -27,9 +27,9 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJobForExecuteFailure; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobForExecuteFailureTest extends AbstractBaseStdJobAutoInitTest { @@ -48,7 +48,7 @@ public void assertJobInit() { while (!StreamingThroughputDataFlowElasticJobForExecuteFailure.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(0)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), not(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteThrowsExceptionTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteThrowsExceptionTest.java index 3b0c479698..74d61875f6 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteThrowsExceptionTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForExecuteThrowsExceptionTest.java @@ -27,9 +27,9 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJobForExecuteThrowsException; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobForExecuteThrowsExceptionTest extends AbstractBaseStdJobAutoInitTest { @@ -48,7 +48,7 @@ public void assertJobInit() { while (!StreamingThroughputDataFlowElasticJobForExecuteThrowsException.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(0)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), not(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForMultipleThreadsTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForMultipleThreadsTest.java index c16370f842..368eec98ea 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForMultipleThreadsTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForMultipleThreadsTest.java @@ -27,9 +27,9 @@ import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobForMultipleThreadsTest extends AbstractBaseStdJobAutoInitTest { @@ -53,7 +53,7 @@ public void assertJobInit() { while (!StreamingThroughputDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(10)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForNotMonitorTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForNotMonitorTest.java index 04b1d37675..dca47d36e7 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForNotMonitorTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForNotMonitorTest.java @@ -27,9 +27,9 @@ import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobForNotMonitorTest extends AbstractBaseStdJobAutoInitTest { @@ -53,7 +53,7 @@ public void assertJobInit() { while (!StreamingThroughputDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertFalse(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertFalse(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(10)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForStopedTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForStopedTest.java index 5fb3d1f62c..ba7205586c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForStopedTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobForStopedTest.java @@ -24,8 +24,8 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJob; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobForStopedTest extends AbstractBaseStdJobAutoInitTest { @@ -44,8 +44,8 @@ public void assertClearStopJobStatusWhenRestartingJob() { while (!StreamingThroughputDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - REG_CENTER.persist("/" + getJobName() + "/servers/" + getLocalHostService().getIp() + "/stoped", ""); + getRegCenter().persist("/" + getJobName() + "/servers/" + getLocalHostService().getIp() + "/stoped", ""); initJob(); - assertFalse(REG_CENTER.isExisted("/" + getJobName() + "/servers/" + getLocalHostService().getIp() + "/stoped")); + assertFalse(getRegCenter().isExisted("/" + getJobName() + "/servers/" + getLocalHostService().getIp() + "/stoped")); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java index 846b60ab3e..06459a4ee9 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/dataflow/throughput/StreamingThroughputDataFlowElasticJobTest.java @@ -26,9 +26,9 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.dataflow.throughput.StreamingThroughputDataFlowElasticJob; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics; -import com.dangdang.ddframe.test.WaitingUtils; public final class StreamingThroughputDataFlowElasticJobTest extends AbstractBaseStdJobAutoInitTest { @@ -47,7 +47,7 @@ public void assertJobInit() { while (!StreamingThroughputDataFlowElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(10)); assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0)); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/simple/SimpleElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/simple/SimpleElasticJobTest.java index 768523a67e..2a3f72be4c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/simple/SimpleElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/std/simple/SimpleElasticJobTest.java @@ -24,8 +24,8 @@ import org.junit.Test; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest; +import com.dangdang.ddframe.job.integrate.WaitingUtils; import com.dangdang.ddframe.job.integrate.fixture.simple.SimpleElasticJob; -import com.dangdang.ddframe.test.WaitingUtils; public final class SimpleElasticJobTest extends AbstractBaseStdJobAutoInitTest { @@ -44,6 +44,6 @@ public void assertJobInit() { while (!SimpleElasticJob.isCompleted()) { WaitingUtils.waitingShortTime(); } - assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution")); + assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution")); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceDisableTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceDisableTest.java index ee16636c04..34fbc33e5f 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceDisableTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceDisableTest.java @@ -23,7 +23,6 @@ import com.dangdang.ddframe.job.fixture.TestJob; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobTest; -import com.dangdang.ddframe.job.internal.monitor.MonitorService; public final class MonitorServiceDisableTest extends AbstractBaseStdJobTest { diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceEnableTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceEnableTest.java index 9702d98c2d..1bc663fa73 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceEnableTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/monitor/MonitorServiceEnableTest.java @@ -26,7 +26,6 @@ import com.dangdang.ddframe.job.fixture.TestJob; import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobTest; -import com.dangdang.ddframe.job.internal.monitor.MonitorService; public final class MonitorServiceEnableTest extends AbstractBaseStdJobTest { diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/plugin/job/type/SimpleElasticJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/plugin/job/type/SimpleElasticJobTest.java index 139ed30d35..0be0c0cc2c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/plugin/job/type/SimpleElasticJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/plugin/job/type/SimpleElasticJobTest.java @@ -140,7 +140,9 @@ public void assertExecuteWhenRunOnceAndThrowException() throws JobExecutionExcep verify(executionService).cleanPreviousExecutionInfo(); verify(executionService).registerJobBegin(shardingContext); verify(jobCaller).process(); - verify(executionService, times(0)).registerJobCompleted(shardingContext); + verify(executionService).registerJobCompleted(shardingContext); + verify(configService, times(1)).isFailover(); + verify(failoverService, times(0)).updateFailoverComplete(shardingContext.getShardingItems()); } } diff --git a/elastic-job-test/src/main/java/com/dangdang/ddframe/test/TestEnvironmentException.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AbstractNestedZookeeperBaseTest.java similarity index 57% rename from elastic-job-test/src/main/java/com/dangdang/ddframe/test/TestEnvironmentException.java rename to elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AbstractNestedZookeeperBaseTest.java index 4e6629dd68..71d18d7ba4 100644 --- a/elastic-job-test/src/main/java/com/dangdang/ddframe/test/TestEnvironmentException.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AbstractNestedZookeeperBaseTest.java @@ -15,22 +15,15 @@ * */ -package com.dangdang.ddframe.test; +package com.dangdang.ddframe.reg; -/** - * 测试环境启动失败所抛出的异常. - * - * @author zhangliang - */ -public final class TestEnvironmentException extends RuntimeException { +import com.google.common.base.Joiner; + +public abstract class AbstractNestedZookeeperBaseTest { - private static final long serialVersionUID = 8253327513203871758L; + public static final int PORT = 3181; - public TestEnvironmentException(final Exception cause) { - super(cause); - } + public static final String TEST_TEMP_DIRECTORY = String.format("target/test_zk_data/%s/", System.nanoTime()); - public TestEnvironmentException(final String message, final Object... args) { - super(String.format(message, args)); - } + public static final String ZK_CONNECTION_STRING = Joiner.on(":").join("localhost", PORT); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AllRegTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AllRegTests.java index d4d42c1799..846749bc12 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AllRegTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/AllRegTests.java @@ -23,21 +23,27 @@ import com.dangdang.ddframe.reg.exception.LocalPropertiesFileNotFoundExceptionTest; import com.dangdang.ddframe.reg.exception.RegExceptionHandlerTest; +import com.dangdang.ddframe.reg.zookeeper.NestedZookeeperServersTest; +import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfigurationTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterForAuthTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterForLocalPropertiesTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterMiscellaneousTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterModifyTest; +import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterNestedTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterQueryWithCacheTest; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterQueryWithoutCacheTest; @RunWith(Suite.class) @SuiteClasses({ + ZookeeperConfigurationTest.class, + NestedZookeeperServersTest.class, ZookeeperRegistryCenterForLocalPropertiesTest.class, ZookeeperRegistryCenterForAuthTest.class, ZookeeperRegistryCenterQueryWithCacheTest.class, ZookeeperRegistryCenterQueryWithoutCacheTest.class, ZookeeperRegistryCenterModifyTest.class, ZookeeperRegistryCenterMiscellaneousTest.class, + ZookeeperRegistryCenterNestedTest.class, RegExceptionHandlerTest.class, LocalPropertiesFileNotFoundExceptionTest.class }) diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/NestedZookeeperServersTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/NestedZookeeperServersTest.java new file mode 100644 index 0000000000..fd8fb3fa86 --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/NestedZookeeperServersTest.java @@ -0,0 +1,57 @@ +/** + * Copyright 1999-2015 dangdang.com. + *+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *
+ */ + +package com.dangdang.ddframe.reg.zookeeper; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Map; + +import org.apache.curator.test.TestingServer; +import org.junit.Test; +import org.unitils.util.ReflectionUtils; + +public final class NestedZookeeperServersTest { + + private NestedZookeeperServers nestedZookeeperServers = NestedZookeeperServers.getInstance(); + + @Test + public void assertStartServerIfNotStarted() throws NoSuchFieldException, SecurityException { + nestedZookeeperServers.startServerIfNotStarted(5555, String.format("target/test_zk_data/5555/%s/", System.nanoTime())); + nestedZookeeperServers.startServerIfNotStarted(5555, String.format("target/test_zk_data/5555/%s/", System.nanoTime())); + assertTrue(getNestedServers().containsKey(5555)); + } + + @Test + public void assertCloseServerIfNotStarted() throws NoSuchFieldException, SecurityException { + nestedZookeeperServers.closeServer(6666); + assertFalse(getNestedServers().containsKey(6666)); + } + + @Test + public void assertCloseServerIfStarted() throws NoSuchFieldException, SecurityException { + nestedZookeeperServers.startServerIfNotStarted(7777, String.format("target/test_zk_data/7777/%s/", System.nanoTime())); + nestedZookeeperServers.closeServer(7777); + assertFalse(getNestedServers().containsKey(7777)); + } + + @SuppressWarnings("unchecked") + private Map+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *
+ */ + +package com.dangdang.ddframe.reg.zookeeper; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public final class ZookeeperConfigurationTest { + + @Test + public void assertIsNotUseNestedZookeeperWhenPortIsNegative() throws Exception { + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(); + assertFalse(zkConfig.isUseNestedZookeeper()); + } + + @Test + public void assertIsNotUseNestedZookeeperWhenDataDirIsNull() throws Exception { + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(); + zkConfig.setNestedPort(3181); + assertFalse(zkConfig.isUseNestedZookeeper()); + } + + @Test + public void assertIsNotUseNestedZookeeperWhenDataDirIsEmpty() throws Exception { + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(); + zkConfig.setNestedPort(3181); + zkConfig.setNestedDataDir(""); + assertFalse(zkConfig.isUseNestedZookeeper()); + } + + @Test + public void assertIsUseNestedZookeeper() throws Exception { + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(); + zkConfig.setNestedPort(3181); + zkConfig.setNestedDataDir("target"); + assertTrue(zkConfig.isUseNestedZookeeper()); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForAuthTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForAuthTest.java index 3c10b689dd..0cd7162012 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForAuthTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForAuthTest.java @@ -28,17 +28,17 @@ import org.junit.Before; import org.junit.Test; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; -public final class ZookeeperRegistryCenterForAuthTest { +public final class ZookeeperRegistryCenterForAuthTest extends AbstractNestedZookeeperBaseTest { - private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterForAuthTest.class.getName(), 1000, 3000, 3); + private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterForAuthTest.class.getName(), 1000, 3000, 3); private ZookeeperRegistryCenter zkRegCenter; @Before public void setUp() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); zkConfig.setDigest("digest:password"); zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); zkConfig.setSessionTimeoutMilliseconds(5000); @@ -56,7 +56,7 @@ public void assertInitWithDigestSuccess() throws Exception { zkRegCenter.init(); zkRegCenter.close(); CuratorFramework client = CuratorFrameworkFactory.builder() - .connectString(NestedZookeeperServers.ZK_CONNECTION_STRING) + .connectString(ZK_CONNECTION_STRING) .retryPolicy(new RetryOneTime(2000)) .authorization("digest", "digest:password".getBytes()).build(); client.start(); @@ -68,7 +68,7 @@ public void assertInitWithDigestSuccess() throws Exception { public void assertInitWithDigestFailure() throws Exception { zkRegCenter.init(); zkRegCenter.close(); - CuratorFramework client = CuratorFrameworkFactory.newClient(NestedZookeeperServers.ZK_CONNECTION_STRING, new RetryOneTime(2000)); + CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_CONNECTION_STRING, new RetryOneTime(2000)); client.start(); client.blockUntilConnected(); client.getData().forPath("/" + ZookeeperRegistryCenterForAuthTest.class.getName() + "/test/deep/nested"); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForLocalPropertiesTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForLocalPropertiesTest.java index c946f7277b..1002965f50 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForLocalPropertiesTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterForLocalPropertiesTest.java @@ -26,17 +26,17 @@ import org.junit.BeforeClass; import org.junit.Test; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; import com.dangdang.ddframe.reg.exception.LocalPropertiesFileNotFoundException; import com.dangdang.ddframe.reg.exception.RegException; -import com.dangdang.ddframe.test.NestedZookeeperServers; -public final class ZookeeperRegistryCenterForLocalPropertiesTest { +public final class ZookeeperRegistryCenterForLocalPropertiesTest extends AbstractNestedZookeeperBaseTest { private ZookeeperRegistryCenter zkRegCenter; @BeforeClass public static void init() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); } @Before @@ -87,11 +87,11 @@ public void assertInitForOverwriteEnabled() { } private ZookeeperRegistryCenter createZookeeperRegistryCenter() { - return new ZookeeperRegistryCenter(new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3)); + return new ZookeeperRegistryCenter(new ZookeeperConfiguration(ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3)); } private void createInitData() { - ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3); + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3); zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); ZookeeperRegistryCenter zkRegCenter = new ZookeeperRegistryCenter(zkConfig); zkRegCenter.init(); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java index 4e696f2abb..1c8c2a2033 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java @@ -27,17 +27,17 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; -public final class ZookeeperRegistryCenterMiscellaneousTest { +public final class ZookeeperRegistryCenterMiscellaneousTest extends AbstractNestedZookeeperBaseTest { - private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterMiscellaneousTest.class.getName(), 1000, 3000, 3); + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterMiscellaneousTest.class.getName(), 1000, 3000, 3); private static ZookeeperRegistryCenter zkRegCenter; @BeforeClass public static void setUp() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); zkRegCenter = new ZookeeperRegistryCenter(zkConfig); zkRegCenter.init(); zkRegCenter.addCacheData("/test"); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterModifyTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterModifyTest.java index 02c1399d51..3004c7ef61 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterModifyTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterModifyTest.java @@ -33,17 +33,17 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; -public final class ZookeeperRegistryCenterModifyTest { +public final class ZookeeperRegistryCenterModifyTest extends AbstractNestedZookeeperBaseTest { - private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterModifyTest.class.getName(), 1000, 3000, 3); + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterModifyTest.class.getName(), 1000, 3000, 3); private static ZookeeperRegistryCenter zkRegCenter; @BeforeClass public static void setUp() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); zkRegCenter = new ZookeeperRegistryCenter(zkConfig); zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); zkRegCenter.init(); @@ -76,7 +76,7 @@ public void assertPersistEphemeral() throws Exception { assertThat(zkRegCenter.get("/persist"), is("persist_value")); assertThat(zkRegCenter.get("/ephemeral"), is("ephemeral_value")); zkRegCenter.close(); - CuratorFramework client = CuratorFrameworkFactory.newClient(NestedZookeeperServers.ZK_CONNECTION_STRING, new RetryOneTime(2000)); + CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_CONNECTION_STRING, new RetryOneTime(2000)); client.start(); client.blockUntilConnected(); assertThat(client.getData().forPath("/" + ZookeeperRegistryCenterModifyTest.class.getName() + "/persist"), is("persist_value".getBytes())); @@ -88,7 +88,7 @@ public void assertPersistEphemeral() throws Exception { public void assertPersistEphemeralSequential() throws Exception { zkRegCenter.persistEphemeralSequential("/sequential/test_sequential"); zkRegCenter.persistEphemeralSequential("/sequential/test_sequential"); - CuratorFramework client = CuratorFrameworkFactory.newClient(NestedZookeeperServers.ZK_CONNECTION_STRING, new RetryOneTime(2000)); + CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_CONNECTION_STRING, new RetryOneTime(2000)); client.start(); client.blockUntilConnected(); List+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *
+ */ + +package com.dangdang.ddframe.reg.zookeeper; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; + +public final class ZookeeperRegistryCenterNestedTest extends AbstractNestedZookeeperBaseTest { + + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterNestedTest.class.getName(), 1000, 3000, 3); + + private static ZookeeperRegistryCenter zkRegCenter; + + @BeforeClass + public static void setUp() { + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); + zkRegCenter = new ZookeeperRegistryCenter(zkConfig); + zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); + zkRegCenter.init(); + } + + @AfterClass + public static void tearDown() { + zkRegCenter.close(); + } + + @Test + public void assertInitWhenUse() { + zkRegCenter.persist("/test", "test_update"); + zkRegCenter.persist("/persist/new", "new_value"); + assertThat(zkRegCenter.get("/test"), is("test_update")); + assertThat(zkRegCenter.get("/persist/new"), is("new_value")); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithCacheTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithCacheTest.java index f2dca87910..2a0c62a6a8 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithCacheTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithCacheTest.java @@ -25,17 +25,17 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; -public final class ZookeeperRegistryCenterQueryWithCacheTest { +public final class ZookeeperRegistryCenterQueryWithCacheTest extends AbstractNestedZookeeperBaseTest { - private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterQueryWithCacheTest.class.getName(), 1000, 3000, 3); + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterQueryWithCacheTest.class.getName(), 1000, 3000, 3); private static ZookeeperRegistryCenter zkRegCenter; @BeforeClass public static void setUp() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); zkRegCenter = new ZookeeperRegistryCenter(zkConfig); zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); zkRegCenter.init(); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithoutCacheTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithoutCacheTest.java index 664d5b8e5f..e6b6051f39 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithoutCacheTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/reg/zookeeper/ZookeeperRegistryCenterQueryWithoutCacheTest.java @@ -29,18 +29,17 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.dangdang.ddframe.test.NestedZookeeperServers; +import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest; -public final class ZookeeperRegistryCenterQueryWithoutCacheTest { +public final class ZookeeperRegistryCenterQueryWithoutCacheTest extends AbstractNestedZookeeperBaseTest { - private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration( - NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterQueryWithoutCacheTest.class.getName(), 1000, 3000, 3); + private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, ZookeeperRegistryCenterQueryWithoutCacheTest.class.getName(), 1000, 3000, 3); private static ZookeeperRegistryCenter zkRegCenter; @BeforeClass public static void setUp() { - NestedZookeeperServers.getInstance().startServerIfNotStarted(); + NestedZookeeperServers.getInstance().startServerIfNotStarted(PORT, TEST_TEMP_DIRECTORY); zkConfig.setLocalPropertiesPath("conf/reg/local.properties"); zkRegCenter = new ZookeeperRegistryCenter(zkConfig); zkRegCenter.init(); diff --git a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java index 1b98873bbf..5e18efbe90 100644 --- a/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java +++ b/elastic-job-example/src/main/java/com/dangdang/example/elasticjob/core/main/JobMain.java @@ -28,7 +28,7 @@ public final class JobMain { - private final ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elasticjob-example", 1000, 3000, 3); + private final ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:4181", "elasticjob-example", 1000, 3000, 3); private final CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); @@ -45,6 +45,8 @@ public static void main(final String[] args) { } public void init() { + zkConfig.setNestedPort(4181); + zkConfig.setNestedDataDir(String.format("target/test_zk_data/%s/", System.nanoTime())); regCenter.init(); new JobScheduler(regCenter, jobConfig1).init(); new JobScheduler(regCenter, jobConfig2).init(); diff --git a/elastic-job-example/src/main/resources/META-INF/withNamespace.xml b/elastic-job-example/src/main/resources/META-INF/withNamespace.xml index 2c34cb254b..d7fa617b91 100644 --- a/elastic-job-example/src/main/resources/META-INF/withNamespace.xml +++ b/elastic-job-example/src/main/resources/META-INF/withNamespace.xml @@ -16,7 +16,7 @@