From 59c7819f929d600cec3870d9431be846006d512b Mon Sep 17 00:00:00 2001 From: beiwei30 Date: Tue, 15 May 2018 10:38:42 +0800 Subject: [PATCH 1/2] reformat the code, and move the test into the correct package --- .../{ => support}/AbortPolicyWithReportTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/{ => support}/AbortPolicyWithReportTest.java (89%) diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/AbortPolicyWithReportTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java similarity index 89% rename from dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/AbortPolicyWithReportTest.java rename to dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java index 293d7ab8871..27d616f6eed 100644 --- a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/AbortPolicyWithReportTest.java +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.dubbo.common.threadpool; +package com.alibaba.dubbo.common.threadpool.support; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; @@ -37,11 +37,11 @@ public void run() { System.out.println("hello"); } }, (ThreadPoolExecutor) Executors.newFixedThreadPool(1)); - }catch (RejectedExecutionException rj){ - + } catch (RejectedExecutionException rj) { + // ignore } - Thread.currentThread().sleep(1000); + Thread.sleep(1000); } } \ No newline at end of file From c35c34b096fc028b1fb538e7a3a072d81d1d9b43 Mon Sep 17 00:00:00 2001 From: beiwei30 Date: Tue, 15 May 2018 14:50:42 +0800 Subject: [PATCH 2/2] unit test for c.a.d.c.threadpool --- .../support/cached/CachedThreadPoolTest.java | 80 +++++++++++++++++ .../support/eager/EagerThreadPoolTest.java | 82 +++++++++++++++++ .../support/eager/TaskQueueTest.java | 89 +++++++++++++++++++ .../support/fixed/FixedThreadPoolTest.java | 81 +++++++++++++++++ .../limited/LimitedThreadPoolTest.java | 79 ++++++++++++++++ 5 files changed, 411 insertions(+) create mode 100644 dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPoolTest.java create mode 100644 dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolTest.java create mode 100644 dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueueTest.java create mode 100644 dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPoolTest.java create mode 100644 dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPoolTest.java diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPoolTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPoolTest.java new file mode 100644 index 00000000000..63c7c93b741 --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPoolTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadpool.support.cached; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.InternalThread; +import com.alibaba.dubbo.common.threadpool.ThreadPool; +import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +public class CachedThreadPoolTest { + @Test + public void getExecutor1() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + + Constants.THREAD_NAME_KEY + "=demo&" + + Constants.CORE_THREADS_KEY + "=1&" + + Constants.THREADS_KEY + "=2&" + + Constants.ALIVE_KEY + "=1000&" + + Constants.QUEUES_KEY + "=0"); + ThreadPool threadPool = new CachedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getCorePoolSize(), is(1)); + assertThat(executor.getMaximumPoolSize(), is(2)); + assertThat(executor.getQueue(), Matchers.>instanceOf(SynchronousQueue.class)); + assertThat(executor.getRejectedExecutionHandler(), + Matchers.instanceOf(AbortPolicyWithReport.class)); + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + Thread thread = Thread.currentThread(); + assertThat(thread, instanceOf(InternalThread.class)); + assertThat(thread.getName(), startsWith("demo")); + latch.countDown(); + } + }); + + latch.await(5000, TimeUnit.MICROSECONDS); + assertThat(latch.getCount(), is(0L)); + } + + @Test + public void getExecutor2() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + Constants.QUEUES_KEY + "=1"); + ThreadPool threadPool = new CachedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getQueue(), Matchers.>instanceOf(LinkedBlockingQueue.class)); + } +} diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolTest.java new file mode 100644 index 00000000000..a7f240896d3 --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadpool.support.eager; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.InternalThread; +import com.alibaba.dubbo.common.threadpool.ThreadPool; +import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +public class EagerThreadPoolTest { + @Test + public void getExecutor1() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + + Constants.THREAD_NAME_KEY + "=demo&" + + Constants.CORE_THREADS_KEY + "=1&" + + Constants.THREADS_KEY + "=2&" + + Constants.ALIVE_KEY + "=1000&" + + Constants.QUEUES_KEY + "=0"); + ThreadPool threadPool = new EagerThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor, instanceOf(EagerThreadPoolExecutor.class)); + assertThat(executor.getCorePoolSize(), is(1)); + assertThat(executor.getMaximumPoolSize(), is(2)); + assertThat(executor.getKeepAliveTime(TimeUnit.MILLISECONDS), is(1000L)); + assertThat(executor.getQueue().remainingCapacity(), is(1)); + assertThat(executor.getQueue(), Matchers.>instanceOf(TaskQueue.class)); + assertThat(executor.getRejectedExecutionHandler(), + Matchers.instanceOf(AbortPolicyWithReport.class)); + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + Thread thread = Thread.currentThread(); + assertThat(thread, instanceOf(InternalThread.class)); + assertThat(thread.getName(), startsWith("demo")); + latch.countDown(); + } + }); + + latch.await(5000, TimeUnit.MICROSECONDS); + assertThat(latch.getCount(), is(0L)); + } + + @Test + public void getExecutor2() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + Constants.QUEUES_KEY + "=2"); + ThreadPool threadPool = new EagerThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getQueue().remainingCapacity(), is(2)); + } + +} diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueueTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueueTest.java new file mode 100644 index 00000000000..2f70398633d --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueueTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadpool.support.eager; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +public class TaskQueueTest { + + @Test(expected = RejectedExecutionException.class) + public void testOffer1() throws Exception { + TaskQueue queue = new TaskQueue(1); + queue.offer(mock(Runnable.class)); + } + + @Test + public void testOffer2() throws Exception { + TaskQueue queue = new TaskQueue(1); + EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class); + Mockito.when(executor.getPoolSize()).thenReturn(2); + Mockito.when(executor.getSubmittedTaskCount()).thenReturn(1); + queue.setExecutor(executor); + assertThat(queue.offer(mock(Runnable.class)), is(true)); + } + + @Test + public void testOffer3() throws Exception { + TaskQueue queue = new TaskQueue(1); + EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class); + Mockito.when(executor.getPoolSize()).thenReturn(2); + Mockito.when(executor.getSubmittedTaskCount()).thenReturn(2); + Mockito.when(executor.getMaximumPoolSize()).thenReturn(4); + queue.setExecutor(executor); + assertThat(queue.offer(mock(Runnable.class)), is(false)); + } + + @Test + public void testOffer4() throws Exception { + TaskQueue queue = new TaskQueue(1); + EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class); + Mockito.when(executor.getPoolSize()).thenReturn(4); + Mockito.when(executor.getSubmittedTaskCount()).thenReturn(4); + Mockito.when(executor.getMaximumPoolSize()).thenReturn(4); + queue.setExecutor(executor); + assertThat(queue.offer(mock(Runnable.class)), is(true)); + } + + @Test(expected = RejectedExecutionException.class) + public void testRetryOffer1() throws Exception { + TaskQueue queue = new TaskQueue(1); + EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class); + Mockito.when(executor.isShutdown()).thenReturn(true); + queue.setExecutor(executor); + queue.retryOffer(mock(Runnable.class), 1000, TimeUnit.MILLISECONDS); + } + + + @Test + public void testRetryOffer2() throws Exception { + TaskQueue queue = new TaskQueue(1); + EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class); + Mockito.when(executor.isShutdown()).thenReturn(false); + queue.setExecutor(executor); + assertThat(queue.retryOffer(mock(Runnable.class), 1000, TimeUnit.MILLISECONDS), is(true)); + } + +} diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPoolTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPoolTest.java new file mode 100644 index 00000000000..faefe6f4d57 --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPoolTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadpool.support.fixed; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.InternalThread; +import com.alibaba.dubbo.common.threadpool.ThreadPool; +import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; +import com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +public class FixedThreadPoolTest { + @Test + public void getExecutor1() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + + Constants.THREAD_NAME_KEY + "=demo&" + + Constants.CORE_THREADS_KEY + "=1&" + + Constants.THREADS_KEY + "=2&" + + Constants.QUEUES_KEY + "=0"); + ThreadPool threadPool = new FixedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getCorePoolSize(), is(2)); + assertThat(executor.getMaximumPoolSize(), is(2)); + assertThat(executor.getKeepAliveTime(TimeUnit.MILLISECONDS), is(0L)); + assertThat(executor.getQueue(), Matchers.>instanceOf(SynchronousQueue.class)); + assertThat(executor.getRejectedExecutionHandler(), + Matchers.instanceOf(AbortPolicyWithReport.class)); + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + Thread thread = Thread.currentThread(); + assertThat(thread, instanceOf(InternalThread.class)); + assertThat(thread.getName(), startsWith("demo")); + latch.countDown(); + } + }); + + latch.await(5000, TimeUnit.MICROSECONDS); + assertThat(latch.getCount(), is(0L)); + } + + @Test + public void getExecutor2() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + Constants.QUEUES_KEY + "=1"); + ThreadPool threadPool = new FixedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getQueue(), Matchers.>instanceOf(LinkedBlockingQueue.class)); + } +} diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPoolTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPoolTest.java new file mode 100644 index 00000000000..b99ce6c8cc2 --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPoolTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadpool.support.limited; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.InternalThread; +import com.alibaba.dubbo.common.threadpool.ThreadPool; +import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +public class LimitedThreadPoolTest { + @Test + public void getExecutor1() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + + Constants.THREAD_NAME_KEY + "=demo&" + + Constants.CORE_THREADS_KEY + "=1&" + + Constants.THREADS_KEY + "=2&" + + Constants.QUEUES_KEY + "=0"); + ThreadPool threadPool = new LimitedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getCorePoolSize(), is(1)); + assertThat(executor.getMaximumPoolSize(), is(2)); + assertThat(executor.getQueue(), Matchers.>instanceOf(SynchronousQueue.class)); + assertThat(executor.getRejectedExecutionHandler(), + Matchers.instanceOf(AbortPolicyWithReport.class)); + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + Thread thread = Thread.currentThread(); + assertThat(thread, instanceOf(InternalThread.class)); + assertThat(thread.getName(), startsWith("demo")); + latch.countDown(); + } + }); + + latch.await(5000, TimeUnit.MICROSECONDS); + assertThat(latch.getCount(), is(0L)); + } + + @Test + public void getExecutor2() throws Exception { + URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" + Constants.QUEUES_KEY + "=1"); + ThreadPool threadPool = new LimitedThreadPool(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url); + assertThat(executor.getQueue(), Matchers.>instanceOf(LinkedBlockingQueue.class)); + } +}