diff --git a/client.go b/client.go index e561892..3c5ba08 100644 --- a/client.go +++ b/client.go @@ -78,14 +78,18 @@ func (u *URL) SetPort(port *int) *URL { // RpcInvocation define rpc invocation type RpcInvocation struct { - ServiceName *string - MethodName *string - ParameterIn *proto.Message - Attachment []byte - LogId *int64 - CompressType *int32 - AuthenticateData []byte - ChunkSize uint32 + ServiceName *string + MethodName *string + ParameterIn *proto.Message + Attachment []byte + LogId *int64 + CompressType *int32 + AuthenticateData []byte + ChunkSize uint32 + TraceId int64 + SpanId int64 + ParentSpanId int64 + RpcRequestMetaExt map[string]string } // NewRpcCient new rpc client @@ -145,6 +149,10 @@ func (r *RpcInvocation) GetRequestRpcDataPackage() (*RpcDataPackage, error) { rpcDataPackage.MagicCode(MAGIC_CODE) rpcDataPackage.AuthenticationData(r.AuthenticateData) rpcDataPackage.chunkSize = r.ChunkSize + rpcDataPackage.TraceId(r.TraceId) + rpcDataPackage.SpanId(r.SpanId) + rpcDataPackage.ParentSpanId(r.ParentSpanId) + rpcDataPackage.RpcRequestMetaExt(r.RpcRequestMetaExt) if r.CompressType != nil { rpcDataPackage.CompressType(*r.CompressType) } diff --git a/client_test.go b/client_test.go index 9a1a76c..d99223a 100644 --- a/client_test.go +++ b/client_test.go @@ -40,6 +40,17 @@ func (as *StringMatchAuthService) Authenticate(service, name string, authToken [ return strings.Compare(AUTH_TOKEN, string(authToken)) == 0 } +type AddOneTraceService struct { +} + +// Trace +func (as *AddOneTraceService) Trace(service, name string, traceInfo *baidurpc.TraceInfo) *baidurpc.TraceInfo { + *traceInfo.SpanId++ + *traceInfo.TraceId++ + *traceInfo.ParentSpanId++ + return traceInfo +} + // TestSingleTcpConnectionClient func TestSingleTcpConnectionClient(t *testing.T) { Convey("TestSingleTcpConnectionClient", t, func() { @@ -230,6 +241,7 @@ func startRpcServer(chunksize uint32) *baidurpc.TcpServer { } rpcServer.RegisterNameWithMethodMapping("EchoService", echoservice, methodMapping) + rpcServer.SetTraceService(new(AddOneTraceService)) rpcServer.Start() return rpcServer @@ -291,6 +303,10 @@ func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, ser rpcInvocation.SetParameterIn(&dm) rpcInvocation.LogId = proto.Int64(1) rpcInvocation.ChunkSize = chunkSize + rpcInvocation.TraceId = 10 + rpcInvocation.SpanId = 11 + rpcInvocation.ParentSpanId = 12 + rpcInvocation.RpcRequestMetaExt = map[string]string{"key1": "value1"} if withAttachement { rpcInvocation.Attachment = []byte("This is attachment data") @@ -333,6 +349,10 @@ func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, ser So(string(response.Attachment), ShouldEqual, "I am a attachementThis is attachment data") } + So(*response.GetTraceId(), ShouldEqual, rpcInvocation.TraceId+1) + So(*response.GetParentSpanId(), ShouldEqual, rpcInvocation.ParentSpanId+1) + So(*response.GetParentSpanId(), ShouldEqual, rpcInvocation.ParentSpanId+1) + So(response.GetRpcRequestMetaExt()["key1"], ShouldEqual, "value1") }) } diff --git a/data.pb.go b/data.pb.go index 7bc77f6..8cb7394 100644 --- a/data.pb.go +++ b/data.pb.go @@ -82,17 +82,20 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type Request struct { - ServiceName *string `protobuf:"bytes,1,req,name=serviceName" json:"serviceName,omitempty"` - MethodName *string `protobuf:"bytes,2,req,name=methodName" json:"methodName,omitempty"` - LogId *int64 `protobuf:"varint,3,opt,name=logId" json:"logId,omitempty"` - ExtraParam []byte `protobuf:"bytes,4,opt,name=extraParam" json:"extraParam,omitempty"` - XXX_unrecognized []byte `json:"-"` + ServiceName *string `protobuf:"bytes,1,req,name=serviceName" json:"serviceName,omitempty"` + MethodName *string `protobuf:"bytes,2,req,name=methodName" json:"methodName,omitempty"` + LogId *int64 `protobuf:"varint,3,opt,name=logId" json:"logId,omitempty"` + TraceId *int64 `protobuf:"varint,4,opt,name=traceId" json:"traceId,omitempty"` + SpanId *int64 `protobuf:"varint,5,opt,name=spanId" json:"spanId,omitempty"` + ParentSpanId *int64 `protobuf:"varint,6,opt,name=parentSpanId" json:"parentSpanId,omitempty"` + RpcRequestMetaExt []*RpcRequestMetaExtField `protobuf:"bytes,7,rep,name=rpcRequestMetaExt" json:"rpcRequestMetaExt,omitempty"` + ExtraParam []byte `protobuf:"bytes,110,opt,name=extraParam" json:"extraParam,omitempty"` + XXX_unrecognized []byte `json:"-"` } -func (m *Request) Reset() { *m = Request{} } -func (m *Request) String() string { return proto.CompactTextString(m) } -func (*Request) ProtoMessage() {} -func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} func (m *Request) GetServiceName() string { if m != nil && m.ServiceName != nil { @@ -122,16 +125,38 @@ func (m *Request) GetExtraParam() []byte { return nil } +type RpcRequestMetaExtField struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *RpcRequestMetaExtField) Reset() { *m = RpcRequestMetaExtField{} } +func (m *RpcRequestMetaExtField) String() string { return proto.CompactTextString(m) } +func (*RpcRequestMetaExtField) ProtoMessage() {} + +func (m *RpcRequestMetaExtField) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *RpcRequestMetaExtField) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + type Response struct { ErrorCode *int32 `protobuf:"varint,1,opt,name=errorCode" json:"errorCode,omitempty"` ErrorText *string `protobuf:"bytes,2,opt,name=errorText" json:"errorText,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (m *Response) Reset() { *m = Response{} } -func (m *Response) String() string { return proto.CompactTextString(m) } -func (*Response) ProtoMessage() {} -func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} func (m *Response) GetErrorCode() int32 { if m != nil && m.ErrorCode != nil { @@ -153,10 +178,9 @@ type ChunkInfo struct { XXX_unrecognized []byte `json:"-"` } -func (m *ChunkInfo) Reset() { *m = ChunkInfo{} } -func (m *ChunkInfo) String() string { return proto.CompactTextString(m) } -func (*ChunkInfo) ProtoMessage() {} -func (*ChunkInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (m *ChunkInfo) Reset() { *m = ChunkInfo{} } +func (m *ChunkInfo) String() string { return proto.CompactTextString(m) } +func (*ChunkInfo) ProtoMessage() {} func (m *ChunkInfo) GetStreamId() int64 { if m != nil && m.StreamId != nil { @@ -183,10 +207,9 @@ type RpcMeta struct { XXX_unrecognized []byte `json:"-"` } -func (m *RpcMeta) Reset() { *m = RpcMeta{} } -func (m *RpcMeta) String() string { return proto.CompactTextString(m) } -func (*RpcMeta) ProtoMessage() {} -func (*RpcMeta) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (m *RpcMeta) Reset() { *m = RpcMeta{} } +func (m *RpcMeta) String() string { return proto.CompactTextString(m) } +func (*RpcMeta) ProtoMessage() {} func (m *RpcMeta) GetRequest() *Request { if m != nil { @@ -236,39 +259,3 @@ func (m *RpcMeta) GetAuthenticationData() []byte { } return nil } - -func init() { - proto.RegisterType((*Request)(nil), "baidurpc.Request") - proto.RegisterType((*Response)(nil), "baidurpc.Response") - proto.RegisterType((*ChunkInfo)(nil), "baidurpc.ChunkInfo") - proto.RegisterType((*RpcMeta)(nil), "baidurpc.RpcMeta") -} - -func init() { proto.RegisterFile("Request.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 361 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x91, 0xdf, 0x6a, 0xdb, 0x30, - 0x14, 0xc6, 0xb1, 0x3d, 0xcf, 0xf6, 0x49, 0x32, 0xd8, 0xd9, 0x2e, 0xc4, 0x18, 0xc3, 0x98, 0x31, - 0x0c, 0x03, 0xc3, 0xf2, 0x06, 0x23, 0x63, 0xe0, 0x8b, 0x8d, 0xa2, 0xe6, 0x05, 0x54, 0xf9, 0xb4, - 0x36, 0x8d, 0x2d, 0x57, 0x96, 0x4b, 0xda, 0xbb, 0x3e, 0x57, 0x5f, 0xae, 0x44, 0x89, 0xff, 0xa4, - 0xf4, 0xf2, 0xfc, 0xbe, 0x4f, 0xd2, 0x77, 0x3e, 0xc1, 0x8a, 0xd3, 0x5d, 0x4f, 0x9d, 0xc9, 0x5a, - 0xad, 0x8c, 0xc2, 0xf0, 0x4a, 0x54, 0x45, 0xaf, 0x5b, 0x99, 0x3c, 0x39, 0x10, 0x9c, 0x34, 0x8c, - 0x61, 0xd1, 0x91, 0xbe, 0xaf, 0x24, 0xfd, 0x17, 0x35, 0x31, 0x27, 0x76, 0xd3, 0x88, 0xcf, 0x11, - 0x7e, 0x03, 0xa8, 0xc9, 0x94, 0xaa, 0xb0, 0x06, 0xd7, 0x1a, 0x66, 0x04, 0x3f, 0x83, 0xbf, 0x53, - 0x37, 0x79, 0xc1, 0xbc, 0xd8, 0x49, 0x3d, 0x7e, 0x1c, 0x0e, 0xa7, 0x68, 0x6f, 0xb4, 0xb8, 0x10, - 0x5a, 0xd4, 0xec, 0x5d, 0xec, 0xa4, 0x4b, 0x3e, 0x23, 0xc9, 0x5f, 0x08, 0x39, 0x75, 0xad, 0x6a, - 0x3a, 0xc2, 0xaf, 0x10, 0x91, 0xd6, 0x4a, 0x6f, 0x54, 0x71, 0x48, 0xe0, 0xa4, 0x3e, 0x9f, 0xc0, - 0xa8, 0x6e, 0x69, 0x6f, 0x98, 0x1b, 0x3b, 0x69, 0xc4, 0x27, 0x90, 0xfc, 0x86, 0x68, 0x53, 0xf6, - 0xcd, 0x6d, 0xde, 0x5c, 0x2b, 0xfc, 0x02, 0x61, 0x67, 0x34, 0x89, 0x3a, 0x2f, 0xec, 0x26, 0x1e, - 0x1f, 0x67, 0x64, 0x10, 0x48, 0x6b, 0x2c, 0xec, 0x0e, 0x1e, 0x1f, 0xc6, 0xe4, 0xd9, 0x85, 0x80, - 0xb7, 0xf2, 0x1f, 0x19, 0x81, 0x3f, 0x21, 0xd0, 0xc7, 0x66, 0x6c, 0x90, 0xc5, 0xfa, 0x63, 0x36, - 0xd4, 0x96, 0x9d, 0x2a, 0xe3, 0x83, 0x03, 0x33, 0x08, 0xf5, 0x69, 0x07, 0x1b, 0x6c, 0xb1, 0xc6, - 0xb9, 0xfb, 0xa8, 0xf0, 0xd1, 0x83, 0x09, 0x2c, 0xa5, 0xaa, 0x5b, 0x4d, 0x5d, 0xb7, 0x7d, 0x68, - 0xc9, 0x16, 0xe6, 0xf3, 0x33, 0x86, 0xdf, 0x61, 0x25, 0x95, 0xd6, 0xb4, 0x13, 0xa6, 0x52, 0x4d, - 0x5e, 0xd8, 0xea, 0x3c, 0x7e, 0x0e, 0xf1, 0x07, 0x7c, 0x10, 0xc6, 0x08, 0x59, 0xd6, 0xd4, 0x98, - 0xcb, 0xea, 0x91, 0x98, 0x6f, 0xef, 0x7a, 0x45, 0xf1, 0x17, 0x44, 0x72, 0x68, 0x87, 0xbd, 0xb7, - 0x11, 0x3f, 0x4d, 0x11, 0xc7, 0xe2, 0xf8, 0xe4, 0xc2, 0x0c, 0x50, 0xf4, 0xa6, 0xa4, 0xc6, 0x54, - 0xd2, 0x3e, 0xf7, 0x47, 0x18, 0xc1, 0x02, 0xfb, 0x81, 0x6f, 0x28, 0x2f, 0x01, 0x00, 0x00, 0xff, - 0xff, 0x7d, 0x01, 0x53, 0x40, 0x66, 0x02, 0x00, 0x00, -} diff --git a/docs/Demo.md b/docs/Demo.md index 3097910..59192a4 100644 --- a/docs/Demo.md +++ b/docs/Demo.md @@ -94,6 +94,58 @@ baidurpc是一种基于TCP协议的二进制高性能RPC通信协议实现。它 至此RPC已经开发完成,运行上面代码,就可以发布完成. +#### 开发启验证功能 + +实现 AuthService 接口 + +```go +type StringMatchAuthService struct { +} + +// Authenticate +func (as *StringMatchAuthService) Authenticate(service, name string, authToken []byte) bool { + if authToken == nil { + return false + } + return strings.Compare(AUTH_TOKEN, string(authToken)) == 0 +} + +``` +设置到service对象 +```go + +// ... +rpcServer := baidurpc.NewTpcServer(&serverMeta) +rpcServer.SetAuthService(new(StringMatchAuthService)) + +``` + +#### 设置trace功能 + +实现 TraceService 接口 + +```go +type AddOneTraceService struct { +} + +// Trace +func (as *AddOneTraceService) Trace(service, name string, traceInfo *baidurpc.TraceInfo) *baidurpc.TraceInfo { + *traceInfo.SpanId++ + *traceInfo.TraceId++ + *traceInfo.ParentSpanId++ + return traceInfo +} + +``` + +设置到service对象 +```go + +// ... +rpcServer := baidurpc.NewTpcServer(&serverMeta) +rpcServer.SetTraceService(new(AddOneTraceService)) + +``` ### 开发RPC客户端 @@ -190,6 +242,29 @@ baidurpc是一种基于TCP协议的二进制高性能RPC通信协议实现。它 ``` +### 设置Trace功能 +```go + // 调用RPC + serviceName := "echoService" + methodName := "echo" + rpcInvocation := baidurpc.NewRpcInvocation(&serviceName, &methodName) + // 设置trace信息 + rpcInvocation.TraceId = 10 + rpcInvocation.SpanId = 11 + rpcInvocation.ParentSpanId = 12 + rpcInvocation.RpcRequestMetaExt = map[string]string{"key1": "value1"} + // 调用时,设置超时功能 + response, err := rpcClient.SendRpcRequestWithTimeout(100*time.Millisecond, rpcInvocation, ¶meterOut) + // 如果发生超时, 返回的错误码为 62 + + // 获取服务端返回的trace信息 + response.GetTraceId() + response.GetParentSpanId() + response.GetParentSpanId() + response.GetRpcRequestMetaExt() + +``` + ### 开发Ha RPC客户端 diff --git a/example/pb.go b/example/pb_example_test.go similarity index 100% rename from example/pb.go rename to example/pb_example_test.go diff --git a/rpcpackage.go b/rpcpackage.go index 1ec99f4..3fdc66a 100644 --- a/rpcpackage.go +++ b/rpcpackage.go @@ -218,6 +218,60 @@ func (r *RpcDataPackage) GetLogId() int64 { return r.Meta.Request.GetLogId() } +func (r *RpcDataPackage) TraceId(traceId int64) *RpcDataPackage { + initRequest(r) + r.Meta.Request.TraceId = &traceId + return r +} + +func (r *RpcDataPackage) GetTraceId() *int64 { + initRequest(r) + return r.Meta.Request.TraceId +} + +func (r *RpcDataPackage) SpanId(spanId int64) *RpcDataPackage { + initRequest(r) + r.Meta.Request.SpanId = &spanId + return r +} + +func (r *RpcDataPackage) GetSpanId() *int64 { + initRequest(r) + return r.Meta.Request.SpanId +} + +func (r *RpcDataPackage) ParentSpanId(parentSpanId int64) *RpcDataPackage { + initRequest(r) + r.Meta.Request.ParentSpanId = &parentSpanId + return r +} + +func (r *RpcDataPackage) GetParentSpanId() *int64 { + initRequest(r) + return r.Meta.Request.ParentSpanId +} + +func (r *RpcDataPackage) RpcRequestMetaExt(ext map[string]string) *RpcDataPackage { + initRequest(r) + extMap := make([]*RpcRequestMetaExtField, 0) + for key, value := range ext { + extfield := &RpcRequestMetaExtField{Key: key, Value: value} + extMap = append(extMap, extfield) + } + r.Meta.Request.RpcRequestMetaExt = extMap + return r +} + +func (r *RpcDataPackage) GetRpcRequestMetaExt() map[string]string { + initRequest(r) + ret := make(map[string]string) + for _, rr := range r.Meta.Request.RpcRequestMetaExt { + ret[rr.Key] = rr.Value + } + + return ret +} + func (r *RpcDataPackage) ErrorCode(errorCode int32) *RpcDataPackage { initResponse(r) diff --git a/server.go b/server.go index ca8947e..523b0b4 100644 --- a/server.go +++ b/server.go @@ -223,6 +223,18 @@ type AuthService interface { Authenticate(service, name string, authToken []byte) bool } +type TraceInfo struct { + TraceId *int64 + SpanId *int64 + ParentSpanId *int64 + RpcRequestMetaExt map[string]string +} + +// TraceService to monitor trace info and return trace info back +type TraceService interface { + Trace(service, name string, traceInfo *TraceInfo) *TraceInfo +} + // DefaultService default implemention for Service interface type DefaultService struct { sname string @@ -276,6 +288,8 @@ type TcpServer struct { authService AuthService protocol *RpcDataPackageProtocol + + traceService TraceService } type serviceMeta struct { @@ -384,6 +398,11 @@ func (s *TcpServer) SetAuthService(authservice AuthService) { s.authService = authservice } +// SetTraceService set trace service +func (s *TcpServer) SetTraceService(traceService TraceService) { + s.traceService = traceService +} + func (s *TcpServer) handleResponse(session *link.Session) { // after function return must close session defer func() { @@ -424,6 +443,26 @@ func (s *TcpServer) handleResponse(session *link.Session) { } } + if s.traceService != nil { + traceInfo := &TraceInfo{TraceId: r.GetTraceId(), SpanId: r.GetSpanId(), ParentSpanId: r.GetParentSpanId()} + traceInfo.RpcRequestMetaExt = r.GetRpcRequestMetaExt() + traceRetrun := s.traceService.Trace(serviceName, methodName, traceInfo) + if traceRetrun != nil { + if traceRetrun.TraceId != nil { + r.TraceId(*traceRetrun.TraceId) + } + if traceRetrun.SpanId != nil { + r.SpanId(*traceRetrun.SpanId) + } + if traceRetrun.ParentSpanId != nil { + r.ParentSpanId(*traceRetrun.ParentSpanId) + } + if traceRetrun.RpcRequestMetaExt != nil { + r.RpcRequestMetaExt(traceRetrun.RpcRequestMetaExt) + } + } + } + serviceId := GetServiceId(serviceName, methodName) service := s.services[serviceId]