Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FailbackClusterInvoker one risk of memory leak #2425 #2822

Merged
merged 27 commits into from
Dec 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,105 +16,150 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*
*/
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

private static final long RETRY_FAILED_PERIOD = 5 * 1000;
private static final long RETRY_FAILED_PERIOD = 5;

/**
* Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
*/
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));
private final int retries;

private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<>();
private volatile ScheduledFuture<?> retryFuture;
private final int failbackTasks;

private volatile Timer failTimer;

public FailbackClusterInvoker(Directory<T> directory) {
super(directory);

int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
if (retriesConfig <= 0) {
retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
}
int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
if (failbackTasksConfig <= 0) {
failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
}
retries = retriesConfig;
failbackTasks = failbackTasksConfig;
}

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> invoker) {
if (retryFuture == null) {
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
if (failTimer == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

@Override
public void run() {
// collect retry statistics
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
failed.put(invocation, invoker);
}

void retryFailed() {
if (failed.isEmpty()) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(invocation, this);
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
}
}

@Override
public void destroy() {
super.destroy();
if (failTimer != null) {
failTimer.stop();
}
}

/**
* RetryTimerTask
*/
private class RetryTimerTask implements TimerTask {
private final Invocation invocation;
private final LoadBalance loadbalance;
private final List<Invoker<T>> invokers;
private final List<Invoker<T>> lastInvokers = new ArrayList<>();
private final int retries;
private final long tick;
private int retryTimes = 0;

RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
this.loadbalance = loadbalance;
this.invocation = invocation;
this.invokers = invokers;
this.retries = retries;
this.tick = tick;
lastInvokers.add(lastInvoker);
}

@Override
public void run(Timeout timeout) {
try {
Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, lastInvokers);
lastInvokers.clear();
lastInvokers.add(retryInvoker);
retryInvoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
if ((++retryTimes) >= retries) {
logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
} else {
rePut(timeout);
}
}
}

private void rePut(Timeout timeout) {
if (timeout == null) {
return;
}

Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}

timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
Expand All @@ -25,22 +26,32 @@
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
* FailbackClusterInvokerTest
*
* add annotation @FixMethodOrder, the testARetryFailed Method must to first execution
*/
@SuppressWarnings("unchecked")
@FixMethodOrder(org.junit.runners.MethodSorters.NAME_ASCENDING)
public class FailbackClusterInvokerTest {

List<Invoker<FailbackClusterInvokerTest>> invokers = new ArrayList<Invoker<FailbackClusterInvokerTest>>();
URL url = URL.valueOf("test://test:11/test");
URL url = URL.valueOf("test://test:11/test?retries=2&failbacktasks=2");
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
Expand Down Expand Up @@ -76,16 +87,17 @@ private void resetInvokerToNoException() {
}

@Test
public void testInvokeExceptoin() {
public void testInvokeException() {
resetInvokerToException();
FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
invoker.invoke(invocation);
Assert.assertNull(RpcContext.getContext().getInvoker());
DubboAppender.clear();
}

@Test()
public void testInvokeNoExceptoin() {
public void testInvokeNoException() {

resetInvokerToNoException();

Expand All @@ -112,21 +124,34 @@ public void testNoInvoke() {
FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
LogUtil.start();
DubboAppender.clear();
invoker.invoke(invocation);
assertEquals(1, LogUtil.findMessage("Failback to invoke"));
LogUtil.stop();
}

@Test()
public void testRetryFailed() {
public void testARetryFailed() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could u pls tell us why u rename the method and why u add the FixMethodOrder annotation?
:)

//Test retries and

resetInvokerToException();

FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
LogUtil.start();
DubboAppender.clear();
invoker.invoke(invocation);
invoker.invoke(invocation);
invoker.invoke(invocation);
Assert.assertNull(RpcContext.getContext().getInvoker());
invoker.retryFailed();// when retry the invoker which get from failed map already is not the mocked invoker,so
// invoker.retryFailed();// when retry the invoker which get from failed map already is not the mocked invoker,so
//Ensure that the main thread is online
CountDownLatch countDown = new CountDownLatch(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think u should assert that the logger contains the specific msg.

countDown.await(15000L, TimeUnit.MILLISECONDS);
LogUtil.stop();
Assert.assertEquals("must have four error message ", 4, LogUtil.findMessage(Level.ERROR, "Failed retry to invoke method"));
Assert.assertEquals("must have two error message ", 2, LogUtil.findMessage(Level.ERROR, "Failed retry times exceed threshold"));
Assert.assertEquals("must have one error message ", 1, LogUtil.findMessage(Level.ERROR, "Failback background works error"));
// it can be invoke successfully
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public class Constants {

public static final int DEFAULT_RETRIES = 2;

public static final int DEFAULT_FAILBACK_TASKS = 100;

public static final int DEFAULT_FAILBACK_TIMES = 3;

// default buffer size is 8k.
public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

Expand Down Expand Up @@ -300,6 +304,8 @@ public class Constants {

public static final String RETRIES_KEY = "retries";

public static final String FAIL_BACK_TASKS_KEY = "failbacktasks";

public static final String PROMPT_KEY = "prompt";

public static final String DEFAULT_PROMPT = "dubbo>";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
<xsd:documentation><![CDATA[ The method retry times. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="failbacktasks" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max failback tasks capacity size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="actives" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max active requests. ]]></xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
<xsd:documentation><![CDATA[ The method retry times. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="failbacktasks" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max failback tasks capacity size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="actives" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max active requests. ]]></xsd:documentation>
Expand Down