Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/adapt weibo #171

Merged
merged 13 commits into from
Aug 17, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static Map<String, Integer> parseExport(String export) {
int port = MathUtil.parseInt(ppDetail[0], 0);
if(port <= 0){
throw new MotanServiceException("Export is malformed :" + export);
}else{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码没格式化

pps.put(MotanConstants.PROTOCOL_MOTAN, port);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.weibo.api.motan.exception;

import com.weibo.api.motan.rpc.RpcContext;


/**
* @author maijunsheng
Expand Down Expand Up @@ -82,9 +84,8 @@ public String getMessage() {
message = motanErrorMsg.getMessage();
}

// TODO 统一上下文 requestid
return "error_message: " + message + ", status: " + motanErrorMsg.getStatus() + ", error_code: " + motanErrorMsg.getErrorCode()
+ ",r=";
+ ",r=" + RpcContext.getContext().getRequestId();
}

public int getStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ public Response filter(Caller<?> caller, Request request) {
}

Application application;
String statName =
caller.getUrl().getProtocol() + MotanConstants.PROTOCOL_SEPARATOR + MotanFrameworkUtil.getGroupMethodString(request);
if (caller instanceof Provider) {
application = new Application(ApplicationInfo.STATISTIC, "rpc_service");
} else {
application = ApplicationInfo.getApplication(caller.getUrl());
StatsUtil.accessStatistic(statName, application, end, end - start, bizProcessTime, accessStatus);
}
StatsUtil.accessStatistic(
caller.getUrl().getProtocol() + MotanConstants.PROTOCOL_SEPARATOR + MotanFrameworkUtil.getFullMethodString(request),
application, end, end - start, bizProcessTime, accessStatus);
application = ApplicationInfo.getApplication(caller.getUrl());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该在else里吧?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该在外面,if块的作用是对server端的所有请求汇总统计,单个service无论server端还是client端都需要统计

StatsUtil.accessStatistic(statName, application, end, end - start, bizProcessTime, accessStatus);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public boolean isTraceEnabled() {
public boolean isDebugEnabled() {
return debug.isDebugEnabled();
}

@Override
public boolean isInfoEnabled() {
return info.isInfoEnabled();
}

@Override
public boolean isWarnEnabled() {
Expand All @@ -127,4 +132,6 @@ public boolean isErrorEnabled() {
public boolean isStatsEnabled() {
return serviceStats.isInfoEnabled();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public interface LogService {
boolean isTraceEnabled();

boolean isDebugEnabled();

boolean isInfoEnabled();

boolean isWarnEnabled();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.weibo.api.motan.protocol.mock;

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.protocol.AbstractProtocol;
import com.weibo.api.motan.rpc.AbstractExporter;
import com.weibo.api.motan.rpc.AbstractReferer;
import com.weibo.api.motan.rpc.Exporter;
import com.weibo.api.motan.rpc.Provider;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.transport.EndpointFactory;
import com.weibo.api.motan.transport.ProviderMessageRouter;
import com.weibo.api.motan.transport.Server;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;

/**
*
* @Description:abstract mock protocol, it can mock all rpc from server or client.
* implementation class must implement 'processRequest()' method, and declare SpiMeta annotation.
* @author zhanglei28
* @date 2016-3-14
*
*/
public abstract class AbstractMockRpcProtocol extends AbstractProtocol {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mock的场景更像是在单元测试里,这里用Local或者Direct之类的词更好吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个类的作用是不管任何service全部都mock处理,就是适用ci测试场景的。测试时继承这个类,实现想要mock的逻辑就可以。

private static ProviderMessageRouter mockProviderMessageRouter;

@Override
protected <T> Exporter<T> createExporter(Provider<T> provider, URL url) {
Exporter<T> exporter = new MockRpcExporter<T>(provider, url);
LoggerUtil.info("create MockRpcExporter: url={}", url);
return exporter;
}

@Override
protected <T> Referer<T> createReferer(Class<T> clz, URL url, URL serviceUrl) {
Referer<T> referer = new MockRpcReferer<T>(clz, url, serviceUrl);
LoggerUtil.info("create MockRpcReferer: url={}", url);
return referer;
}

class MockRpcExporter<T> extends AbstractExporter<T> {
private Server server;
private EndpointFactory endpointFactory;

public MockRpcExporter(Provider<T> provider, URL url) {
super(provider, url);

ProviderMessageRouter requestRouter = getMockProviderMessageRouter(url);
endpointFactory =
ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
server = endpointFactory.createServer(url, requestRouter);
}

@Override
public void unexport() {
String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
Exporter<T> exporter = (Exporter<T>) exporterMap.remove(protocolKey);
if (exporter != null) {
exporter.destroy();
}
}

@Override
public void destroy() {
endpointFactory.safeReleaseResource(server, url);
LoggerUtil.info("MockRpcExporter destory Success: url={}", url);
}

@Override
protected boolean doInit() {
return server.open();
}

@Override
public boolean isAvailable() {
return server.isAvailable();
}

}


class MockRpcReferer<T> extends AbstractReferer<T> {


public MockRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
super(clz, url, serviceUrl);
}


@Override
public void destroy() {
LoggerUtil.info("MockRpcReferer destroy Success: url={}", url);
}

@Override
protected Response doCall(Request request) {
return processRequest(request);
}

@Override
protected boolean doInit() {
LoggerUtil.info("MockRpcReferer init Success: url={}", url);
return true;
}

}

// process all urls。
public ProviderMessageRouter getMockProviderMessageRouter(URL url) {
if (mockProviderMessageRouter == null) {
//default
mockProviderMessageRouter = new MockProviderMessageRouter();
}

return mockProviderMessageRouter;
}

class MockProviderMessageRouter extends ProviderMessageRouter {

@Override
public Object handle(Channel channel, Object message) {
if (channel == null || message == null) {
throw new MotanFrameworkException("RequestRouter handler(channel, message) params is null");
}

if (!(message instanceof Request)) {
throw new MotanFrameworkException("RequestRouter message type not support: " + message.getClass());
}
return processRequest((Request) message);
}

}

/**
* process request. request is mock processed by client or server
*
* @param request
* @return
*/
protected abstract Response processRequest(Request request);

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ public class ApplicationInfo {
public static Application getApplication(URL url) {
Application application = applications.get(url.getPath());
if (application == null && MotanConstants.NODE_TYPE_REFERER.equals(url.getParameter(URLParamType.nodeType.getName()))) {
String app = url.getParameter(URLParamType.application.getName(), URLParamType.application.getValue()) + CLIENT;
String module = url.getParameter(URLParamType.module.getName(), URLParamType.module.getValue()) + CLIENT;

applications.putIfAbsent(url.getPath() + CLIENT, new Application(app, module));
application = applications.get(url.getPath() + CLIENT);
}
if(application == null){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个判断没必要吧,上面已经判空了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

application = applications.get(url.getPath() + CLIENT);时已经重新赋值了,有可能 不为空。
这块的逻辑是先按application查找,找不到时可能是远程请求过来的application,这种情况加一个client后缀查找,没有就添加一个带client后缀的application

String app = url.getParameter(URLParamType.application.getName(), URLParamType.application.getValue()) + CLIENT;
String module = url.getParameter(URLParamType.module.getName(), URLParamType.module.getValue()) + CLIENT;

applications.putIfAbsent(url.getPath() + CLIENT, new Application(app, module));
application = applications.get(url.getPath() + CLIENT);
}
}
return application;
}

Expand Down
71 changes: 68 additions & 3 deletions motan-core/src/main/java/com/weibo/api/motan/rpc/RpcContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
import java.util.HashMap;
import java.util.Map;

import com.weibo.api.motan.common.URLParamType;

/**
* rpc session context
*
* @author maijunsheng
*
*/
public class RpcContext {
private Map<Object, Object> attribute = new HashMap<Object, Object>(1);
private Map<Object, Object> attribute = new HashMap<Object, Object>();
private Request request;
private Response response;
private String clientRequestId = null;

private static ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {
private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {
protected RpcContext initialValue() {
return new RpcContext();
}
Expand All @@ -37,12 +42,72 @@ protected RpcContext initialValue() {
public static RpcContext getContext() {
return localContext.get();
}

/**
* init new rpcContext with request
* @param request
* @return
*/
public static RpcContext init(Request request){
RpcContext context = new RpcContext();
if(request != null){
context.setRequest(request);
context.setClientRequestId(request.getAttachments().get(URLParamType.requestIdFromClient.getName()));
}
localContext.set(context);
return context;
}

public static void destroy() {
localContext.set(null);
localContext.remove();
}

/**
* clientRequestId > request.id
* @return
*/
public String getRequestId(){
if(clientRequestId != null){
return clientRequestId;
} else{
return request == null ? null : String.valueOf(request.getRequestId());
}
}

public void putAttribute(Object key, Object value){
attribute.put(key, value);
}

public Object getAttribute(Object key) {
return attribute.get(key);
}

public void revomeAttribute(Object key){
attribute.remove(key);
}

public Request getRequest() {
return request;
}

public void setRequest(Request request) {
this.request = request;
}

public Response getResponse() {
return response;
}

public void setResponse(Response response) {
this.response = response;
}

public String getClientRequestId() {
return clientRequestId;
}

public void setClientRequestId(String clientRequestId) {
this.clientRequestId = clientRequestId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.weibo.api.motan.rpc.init;

import com.weibo.api.motan.core.extension.Scope;
import com.weibo.api.motan.core.extension.Spi;

/**
*
* @Description Initializable. the implement must be thread safe!
* @author zhanglei
* @date 2016-6-15
*
*/
@Spi(scope = Scope.SINGLETON)
public interface Initializable {
void init();

}
Loading