Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1416] fix sentinel-apache-dubbo-adapter full GC problem #1431

Merged
merged 8 commits into from
Jun 4, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,35 @@
package com.alibaba.csp.sentinel.adapter.dubbo;


import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.adapter.dubbo.config.DubboConfig;
import com.alibaba.csp.sentinel.context.ContextUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;

/**
* Base Class of the {@link SentinelDubboProviderFilter} and {@link SentinelDubboConsumerFilter}.
*
* @author Zechao Zheng
*/

public abstract class BaseSentinelDubboFilter extends ListenableFilter {
public BaseSentinelDubboFilter() {
this.listener = new SentinelDubboListener();
}
public abstract class BaseSentinelDubboFilter implements Filter {

static class SentinelDubboListener implements Listener {

public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
onSuccess(appResponse, invoker);
}
/**
* Get method name of dubbo rpc
*
* @param invoker
* @param invocation
* @return
*/
abstract String getMethodName(Invoker invoker, Invocation invocation);

//for compatible dubbo 2.7.5 rename onResponse to onMessage
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
onSuccess(appResponse, invoker);
}
/**
* Get interface name of dubbo rpc
*
* @param invoker
* @return
*/
abstract String getInterfaceName(Invoker invoker);

private void onSuccess(Result appResponse, Invoker<?> invoker) {
if (DubboConfig.getDubboBizExceptionTraceEnabled()) {
traceAndExit(appResponse.getException(), invoker.getUrl());
} else {
traceAndExit(null, invoker.getUrl());
}
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
traceAndExit(t, invoker.getUrl());
}

}

static void traceAndExit(Throwable throwable, URL url) {
Entry interfaceEntry = (Entry) RpcContext.getContext().get(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY);
Entry methodEntry = (Entry) RpcContext.getContext().get(DubboUtils.DUBBO_METHOD_ENTRY_KEY);
if (methodEntry != null) {
Tracer.traceEntry(throwable, methodEntry);
methodEntry.exit();
RpcContext.getContext().remove(DubboUtils.DUBBO_METHOD_ENTRY_KEY);
}
if (interfaceEntry != null) {
Tracer.traceEntry(throwable, interfaceEntry);
interfaceEntry.exit();
RpcContext.getContext().remove(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY);
}
if (CommonConstants.PROVIDER_SIDE.equals(url.getParameter(CommonConstants.SIDE_KEY))) {
ContextUtil.exit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
public final class DubboUtils {

public static final String SENTINEL_DUBBO_APPLICATION_KEY = "dubboApplication";
public static final String DUBBO_METHOD_ENTRY_KEY = "dubboMethodEntry";
public static final String DUBBO_INTERFACE_ENTRY_KEY = "dubboInterfaceEntry";

public static String getApplication(Invocation invocation, String defaultValue) {
if (invocation == null || invocation.getAttachments() == null) {
Expand Down Expand Up @@ -69,6 +67,13 @@ public static String getResourceName(Invoker<?> invoker, Invocation invocation,
return getResourceName(invoker, invocation, DubboConfig.getDubboInterfaceGroupAndVersionEnabled());
}
}


public static String getInterfaceName(Invoker invoker) {
return DubboConfig.getDubboInterfaceGroupAndVersionEnabled() ? invoker.getUrl().getColonSeparatedKey()
: invoker.getInterface().getName();
}

private DubboUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.ResourceTypeConstants;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.adapter.dubbo.config.DubboConfig;
import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DubboFallbackRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;
Expand All @@ -28,10 +29,12 @@
import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.Stack;
import java.util.function.BiConsumer;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;

/**
Expand All @@ -52,33 +55,87 @@ public SentinelDubboConsumerFilter() {
RecordLog.info("Sentinel Apache Dubbo consumer filter initialized");
}

@Override
String getMethodName(Invoker invoker, Invocation invocation) {
return DubboUtils.getResourceName(invoker, invocation, DubboConfig.getDubboConsumerPrefix());
}

@Override
String getInterfaceName(Invoker invoker) {
return DubboUtils.getInterfaceName(invoker);
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
InvokeMode invokeMode = RpcUtils.getInvokeMode(invoker.getUrl(), invocation);
if (InvokeMode.SYNC == invokeMode) {
return syncInvoke(invoker, invocation);
} else {
return asyncInvoke(invoker, invocation);
}

}

private Result syncInvoke(Invoker<?> invoker, Invocation invocation) {
Entry interfaceEntry = null;
Entry methodEntry = null;
RpcContext rpcContext = RpcContext.getContext();
String methodResourceName = getMethodName(invoker, invocation);
String interfaceResourceName = getInterfaceName(invoker);
try {
String methodResourceName = DubboUtils.getResourceName(invoker, invocation, DubboConfig.getDubboConsumerPrefix());
String interfaceResourceName = DubboConfig.getDubboInterfaceGroupAndVersionEnabled() ? invoker.getUrl().getColonSeparatedKey()
: invoker.getInterface().getName();
InvokeMode invokeMode = RpcUtils.getInvokeMode(invoker.getUrl(), invocation);

if (InvokeMode.SYNC == invokeMode) {
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT);
rpcContext.set(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY, interfaceEntry);
methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT, invocation.getArguments());
} else {
// should generate the AsyncEntry when the invoke model in future or async
interfaceEntry = SphU.asyncEntry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT);
rpcContext.set(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY, interfaceEntry);
methodEntry = SphU.asyncEntry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT, 1, invocation.getArguments());
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT);
methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT, invocation.getArguments());
Result result = invoker.invoke(invocation);
if (result.hasException()) {
Tracer.traceEntry(result.getException(), interfaceEntry);
Tracer.traceEntry(result.getException(), methodEntry);
}
return result;
} catch (BlockException e) {
return DubboFallbackRegistry.getConsumerFallback().handle(invoker, invocation, e);
} catch (RpcException e) {
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
throw e;
} finally {
if (methodEntry != null) {
methodEntry.exit();
}
if (interfaceEntry != null) {
interfaceEntry.exit();
}
rpcContext.set(DubboUtils.DUBBO_METHOD_ENTRY_KEY, methodEntry);
return invoker.invoke(invocation);
}
}


private Result asyncInvoke(Invoker<?> invoker, Invocation invocation) {
Stack<Entry> entryStack = new Stack<>();
linlinisme marked this conversation as resolved.
Show resolved Hide resolved
String methodResourceName = getMethodName(invoker, invocation);
String interfaceResourceName = getInterfaceName(invoker);
try {
entryStack.push(SphU.asyncEntry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT));
entryStack.push(SphU.asyncEntry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT, 1, invocation.getArguments()));
Result result = invoker.invoke(invocation);
result.whenCompleteWithContext(new BiConsumer<Result, Throwable>() {
@Override
public void accept(Result result, Throwable throwable) {
while (entryStack.size() > 0) {
Entry entry = entryStack.pop();
Tracer.traceEntry(result.getException(), entry);
entry.exit();
}
}
});
return result;
} catch (BlockException e) {
while (entryStack.size() > 0) {
entryStack.pop().exit();
}
return DubboFallbackRegistry.getConsumerFallback().handle(invoker, invocation, e);
}


}

}


Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.ResourceTypeConstants;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.adapter.dubbo.config.DubboConfig;
import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DubboFallbackRegistry;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
Expand All @@ -52,27 +53,50 @@ public SentinelDubboProviderFilter() {
RecordLog.info("Sentinel Apache Dubbo provider filter initialized");
}

@Override
String getMethodName(Invoker invoker, Invocation invocation) {
return DubboUtils.getResourceName(invoker, invocation, DubboConfig.getDubboProviderPrefix());
}

@Override
String getInterfaceName(Invoker invoker) {
return DubboUtils.getInterfaceName(invoker);
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// Get origin caller.
String application = DubboUtils.getApplication(invocation, "");
RpcContext rpcContext = RpcContext.getContext();
Entry interfaceEntry = null;
Entry methodEntry = null;
String methodResourceName = getMethodName(invoker, invocation);
String interfaceResourceName = getInterfaceName(invoker);
try {
String methodResourceName = DubboUtils.getResourceName(invoker, invocation, DubboConfig.getDubboProviderPrefix());
String interfaceResourceName = DubboConfig.getDubboInterfaceGroupAndVersionEnabled() ? invoker.getUrl().getColonSeparatedKey()
: invoker.getInterface().getName();
// Only need to create entrance context at provider side, as context will take effect
// at entrance of invocation chain only (for inbound traffic).
ContextUtil.enter(methodResourceName, application);
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN);
rpcContext.set(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY, interfaceEntry);
methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN, invocation.getArguments());
rpcContext.set(DubboUtils.DUBBO_METHOD_ENTRY_KEY, methodEntry);
return invoker.invoke(invocation);
Result result = invoker.invoke(invocation);
if (result.hasException()) {
Tracer.traceEntry(result.getException(), interfaceEntry);
Tracer.traceEntry(result.getException(), methodEntry);
}
return result;
} catch (BlockException e) {
return DubboFallbackRegistry.getProviderFallback().handle(invoker, invocation, e);
} catch (RpcException e) {
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
throw e;
} finally {
if (methodEntry != null) {
methodEntry.exit(1, invocation.getArguments());
}
if (interfaceEntry != null) {
interfaceEntry.exit();
}
ContextUtil.exit();
}
}

Expand Down
Loading