diff --git a/config/service_config.go b/config/service_config.go index b0f75cf385..fe8ba4b97a 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -138,7 +138,7 @@ func (s *ServiceConfig) Init(rc *RootConfig) error { s.ProtocolIDs = rc.Provider.ProtocolIDs } if len(s.ProtocolIDs) <= 0 { - for k, _ := range rc.Protocols { + for k := range rc.Protocols { s.ProtocolIDs = append(s.ProtocolIDs, k) } } @@ -400,7 +400,7 @@ func (s *ServiceConfig) Unexport() { s.exportersLock.Lock() defer s.exportersLock.Unlock() for _, exporter := range s.exporters { - exporter.Unexport() + exporter.UnExport() } s.exporters = nil }() diff --git a/filter/token/filter.go b/filter/token/filter.go index 2f6fa036d3..a84e9deddf 100644 --- a/filter/token/filter.go +++ b/filter/token/filter.go @@ -44,6 +44,10 @@ func init() { extension.SetFilter(constant.TokenFilterKey, newTokenFilter) } +const ( + InValidTokenFormat = "[Token Filter]Invalid token! Forbid invoke remote service %v with method %s" +) + // tokenFilter will verify if the token is valid type tokenFilter struct{} @@ -60,13 +64,31 @@ func newTokenFilter() filter.Filter { func (f *tokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { invokerTkn := invoker.GetURL().GetParam(constant.TokenKey, "") if len(invokerTkn) > 0 { - attachs := invocation.Attachments() - remoteTkn, exist := attachs[constant.TokenKey] - if exist && remoteTkn != nil && strings.EqualFold(invokerTkn, remoteTkn.(string)) { + attas := invocation.Attachments() + var remoteTkn string + remoteTknIface, exist := attas[constant.TokenKey] + if !exist || remoteTknIface == nil { + return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())} + } + switch remoteTknIface.(type) { + case string: + // deal with dubbo protocol + remoteTkn = remoteTknIface.(string) + case []string: + // deal with triple protocol + remoteTkns := remoteTknIface.([]string) + if len(remoteTkns) != 1 { + return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())} + } + remoteTkn = remoteTkns[0] + default: + return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())} + } + + if strings.EqualFold(invokerTkn, remoteTkn) { return invoker.Invoke(ctx, invocation) } - return &protocol.RPCResult{Err: perrors.Errorf("Invalid token! Forbid invoke remote service %v method %s ", - invoker, invocation.MethodName())} + return &protocol.RPCResult{Err: perrors.Errorf(InValidTokenFormat, invoker, invocation.MethodName())} } return invoker.Invoke(ctx, invocation) diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go index 1f4008438e..ad50f309cf 100644 --- a/protocol/dubbo/dubbo_exporter.go +++ b/protocol/dubbo/dubbo_exporter.go @@ -44,11 +44,11 @@ func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap *sync.Ma } // Unexport unexport dubbo service exporter. -func (de *DubboExporter) Unexport() { +func (de *DubboExporter) UnExport() { interfaceName := de.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "") - de.BaseExporter.Unexport() + de.BaseExporter.UnExport() err := common.ServiceMap.UnRegister(interfaceName, DUBBO, de.GetInvoker().GetURL().ServiceKey()) if err != nil { - logger.Errorf("[DubboExporter.Unexport] error: %v", err) + logger.Errorf("[DubboExporter.UnExport] error: %v", err) } } diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index df16807eed..6f7275366c 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -36,7 +36,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/protocol" - invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -84,7 +84,7 @@ func (di *DubboInvoker) getClient() *remoting.ExchangeClient { } // Invoke call remoting. -func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (di *DubboInvoker) Invoke(ctx context.Context, ivc protocol.Invocation) protocol.Result { var ( err error result protocol.RPCResult @@ -114,7 +114,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati return &result } - inv := invocation.(*invocation_impl.RPCInvocation) + inv := ivc.(*invocation.RPCInvocation) // init param inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, "")) for _, k := range attachmentKey { @@ -142,15 +142,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati timeout := di.getTimeout(inv) if async { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { - result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) + result.Err = di.client.AsyncRequest(&ivc, url, timeout, callBack, rest) } else { - result.Err = di.client.Send(&invocation, url, timeout) + result.Err = di.client.Send(&ivc, url, timeout) } } else { if inv.Reply() == nil { result.Err = protocol.ErrNoReply } else { - result.Err = di.client.Request(&invocation, url, timeout, rest) + result.Err = di.client.Request(&ivc, url, timeout, rest) } } if result.Err == nil { @@ -162,21 +162,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } // get timeout including methodConfig -func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration { - methodName := invocation.MethodName() +func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation) time.Duration { + methodName := ivc.MethodName() if di.GetURL().GetParamBool(constant.GenericKey, false) { - methodName = invocation.Arguments()[0].(string) + methodName = ivc.Arguments()[0].(string) } timeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, constant.TimeoutKey}, "."), "") if len(timeout) != 0 { if t, err := time.ParseDuration(timeout); err == nil { // config timeout into attachment - invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) + ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) return t } } // set timeout into invocation at method level - invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) + ivc.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) return di.timeout } @@ -207,11 +207,11 @@ func (di *DubboInvoker) Destroy() { // Finally, I made the decision that I don't provide a general way to transfer the whole context // because it could be misused. If the context contains to many key-value pairs, the performance will be much lower. -func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) { +func (di *DubboInvoker) appendCtx(ctx context.Context, ivc *invocation.RPCInvocation) { // inject opentracing ctx currentSpan := opentracing.SpanFromContext(ctx) if currentSpan != nil { - err := injectTraceCtx(currentSpan, inv) + err := injectTraceCtx(currentSpan, ivc) if err != nil { logger.Errorf("Could not inject the span context into attachments: %v", err) } diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 3ccbcd1582..a7988fab2b 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -100,10 +100,10 @@ func TestDubboProtocol_Export(t *testing.T) { eq2 := exporter2.GetInvoker().GetURL().URLEqual(url2) assert.True(t, eq2) - // make sure exporterMap after 'Unexport' + // make sure exporterMap after 'UnExport' _, ok := proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey()) assert.True(t, ok) - exporter2.Unexport() + exporter2.UnExport() _, ok = proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey()) assert.False(t, ok) diff --git a/protocol/dubbo3/dubbo3_exporter.go b/protocol/dubbo3/dubbo3_exporter.go index 3b997b089d..13214de340 100644 --- a/protocol/dubbo3/dubbo3_exporter.go +++ b/protocol/dubbo3/dubbo3_exporter.go @@ -49,13 +49,13 @@ func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap *sync.Ma } // Unexport unexport dubbo3 service exporter. -func (de *DubboExporter) Unexport() { +func (de *DubboExporter) UnExport() { url := de.GetInvoker().GetURL() interfaceName := url.GetParam(constant.InterfaceKey, "") - de.BaseExporter.Unexport() + de.BaseExporter.UnExport() err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey()) if err != nil { - logger.Errorf("[DubboExporter.Unexport] error: %v", err) + logger.Errorf("[DubboExporter.UnExport] error: %v", err) } de.serviceMap.Delete(interfaceName) } diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go index 9e8801ed4a..11cf801da7 100644 --- a/protocol/dubbo3/dubbo3_invoker.go +++ b/protocol/dubbo3/dubbo3_invoker.go @@ -73,8 +73,8 @@ func NewDubboInvoker(url *common.URL) (*DubboInvoker, error) { interfaceKey := url.GetParam(constant.InterfaceKey, "") consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey) - dubboSerializaerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization) - triCodecType := tripleConstant.CodecType(dubboSerializaerType) + dubboSerializerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization) + triCodecType := tripleConstant.CodecType(dubboSerializerType) // new triple client opts := []triConfig.OptionFunction{ triConfig.WithClientTimeout(uint32(timeout.Seconds())), @@ -181,6 +181,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // append interface id to ctx gRPCMD := make(metadata.MD, 0) + // triple will convert attachment value to []string for k, v := range invocation.Attachments() { if str, ok := v.(string); ok { gRPCMD.Set(k, str) @@ -190,7 +191,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati gRPCMD.Set(k, str...) continue } - logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k) + logger.Warnf("[Triple Protocol]Triple attachment value with key = %s is invalid, which should be string or []string", k) } ctx = metadata.NewOutgoingContext(ctx, gRPCMD) ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, "")) diff --git a/protocol/dubbo3/dubbo3_protocol_test.go b/protocol/dubbo3/dubbo3_protocol_test.go index b0baa35f24..34a5c276e0 100644 --- a/protocol/dubbo3/dubbo3_protocol_test.go +++ b/protocol/dubbo3/dubbo3_protocol_test.go @@ -57,10 +57,10 @@ func TestDubboProtocolExport(t *testing.T) { eq := exporter.GetInvoker().GetURL().URLEqual(url) assert.True(t, eq) - // make sure exporterMap after 'Unexport' + // make sure exporterMap after 'UnExport' _, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) assert.True(t, ok) - exporter.Unexport() + exporter.UnExport() _, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) assert.False(t, ok) @@ -144,7 +144,7 @@ func TestDubbo3UnaryService_GetReqParamsInterfaces(t *testing.T) { func subTest(t *testing.T, val, paramsInterfaces interface{}) { list := paramsInterfaces.([]interface{}) - for k, _ := range list { + for k := range list { err := hessian.ReflectResponse(val, list[k]) assert.Nil(t, err) } diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go index 464ce39f1f..c145eafacc 100644 --- a/protocol/grpc/grpc_exporter.go +++ b/protocol/grpc/grpc_exporter.go @@ -44,11 +44,11 @@ func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map } // Unexport and unregister gRPC service from registry and memory. -func (gg *GrpcExporter) Unexport() { +func (gg *GrpcExporter) UnExport() { interfaceName := gg.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "") - gg.BaseExporter.Unexport() + gg.BaseExporter.UnExport() err := common.ServiceMap.UnRegister(interfaceName, GRPC, gg.GetInvoker().GetURL().ServiceKey()) if err != nil { - logger.Errorf("[GrpcExporter.Unexport] error: %v", err) + logger.Errorf("[GrpcExporter.UnExport] error: %v", err) } } diff --git a/protocol/grpc/grpc_protocol_test.go b/protocol/grpc/grpc_protocol_test.go index 9790843f95..499378000e 100644 --- a/protocol/grpc/grpc_protocol_test.go +++ b/protocol/grpc/grpc_protocol_test.go @@ -83,10 +83,10 @@ func TestGrpcProtocolExport(t *testing.T) { eq := exporter.GetInvoker().GetURL().URLEqual(url) assert.True(t, eq) - // make sure exporterMap after 'Unexport' + // make sure exporterMap after 'UnExport' _, ok := proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey()) assert.True(t, ok) - exporter.Unexport() + exporter.UnExport() _, ok = proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey()) assert.False(t, ok) diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go index 6cdfb46c52..71d6b7d756 100644 --- a/protocol/jsonrpc/jsonrpc_exporter.go +++ b/protocol/jsonrpc/jsonrpc_exporter.go @@ -44,11 +44,11 @@ func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync. } // Unexport exported JSON RPC service. -func (je *JsonrpcExporter) Unexport() { +func (je *JsonrpcExporter) UnExport() { interfaceName := je.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "") - je.BaseExporter.Unexport() + je.BaseExporter.UnExport() err := common.ServiceMap.UnRegister(interfaceName, JSONRPC, je.GetInvoker().GetURL().ServiceKey()) if err != nil { - logger.Errorf("[JsonrpcExporter.Unexport] error: %v", err) + logger.Errorf("[JsonrpcExporter.UnExport] error: %v", err) } } diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go index feb070a7af..6189189401 100644 --- a/protocol/jsonrpc/jsonrpc_protocol_test.go +++ b/protocol/jsonrpc/jsonrpc_protocol_test.go @@ -48,11 +48,11 @@ func TestJsonrpcProtocolExport(t *testing.T) { eq := exporter.GetInvoker().GetURL().URLEqual(url) assert.True(t, eq) - // make sure exporterMap after 'Unexport' + // make sure exporterMap after 'UnExport' fmt.Println(url.Path) _, ok := proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) assert.True(t, ok) - exporter.Unexport() + exporter.UnExport() _, ok = proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) assert.False(t, ok) diff --git a/protocol/protocol.go b/protocol/protocol.go index abd83c9962..a71fc8595a 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -29,7 +29,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" ) -// Protocol is the interface that wraps the basic Export、 Refer and Destroy method. +// Protocol is the interface that wraps the basic Export, Refer and Destroy method. // // Export method is to export service for remote invocation // @@ -42,14 +42,14 @@ type Protocol interface { Destroy() } -// Exporter is the interface that wraps the basic GetInvoker method and Destroy Unexport. +// Exporter is the interface that wraps the basic GetInvoker method and Destroy UnExport. // // GetInvoker method is to get invoker. // -// Unexport method is to unexport a exported service +// UnExport is to un export an exported service type Exporter interface { GetInvoker() Invoker - Unexport() + UnExport() } // BaseProtocol is default protocol implement. @@ -105,10 +105,10 @@ func (bp *BaseProtocol) Destroy() { } bp.invokers = []Invoker{} - // unexport exporters + // un export exporters bp.exporterMap.Range(func(key, exporter interface{}) bool { if exporter != nil { - exporter.(Exporter).Unexport() + exporter.(Exporter).UnExport() } else { bp.exporterMap.Delete(key) } @@ -137,8 +137,8 @@ func (de *BaseExporter) GetInvoker() Invoker { return de.invoker } -// Unexport exported service. -func (de *BaseExporter) Unexport() { +// UnExport un export service. +func (de *BaseExporter) UnExport() { logger.Infof("Exporter unexport.") de.invoker.Destroy() de.exporterMap.Delete(de.key) diff --git a/protocol/rest/rest_exporter.go b/protocol/rest/rest_exporter.go index 9bc27da2db..91cf15d0b3 100644 --- a/protocol/rest/rest_exporter.go +++ b/protocol/rest/rest_exporter.go @@ -44,11 +44,11 @@ func NewRestExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map } // Unexport unexport the RestExporter -func (re *RestExporter) Unexport() { +func (re *RestExporter) UnExport() { interfaceName := re.GetInvoker().GetURL().GetParam(constant.InterfaceKey, "") - re.BaseExporter.Unexport() + re.BaseExporter.UnExport() err := common.ServiceMap.UnRegister(interfaceName, REST, re.GetInvoker().GetURL().ServiceKey()) if err != nil { - logger.Errorf("[RestExporter.Unexport] error: %v", err) + logger.Errorf("[RestExporter.UnExport] error: %v", err) } } diff --git a/protocol/rest/rest_protocol_test.go b/protocol/rest/rest_protocol_test.go index d216217a63..1bd6762324 100644 --- a/protocol/rest/rest_protocol_test.go +++ b/protocol/rest/rest_protocol_test.go @@ -107,11 +107,11 @@ package rest // // make sure url // eq := exporter.GetInvoker().GetURL().URLEqual(url) // assert.True(t, eq) -// // make sure exporterMap after 'Unexport' +// // make sure exporterMap after 'UnExport' // fmt.Println(url.Path) // _, ok := proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) // assert.True(t, ok) -// exporter.Unexport() +// exporter.UnExport() // _, ok = proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) // assert.False(t, ok) // diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 50f60e3d0a..1b0d390d09 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -241,9 +241,9 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common key := getCacheKey(invoker) if oldExporter, loaded := proto.bounds.Load(key); loaded { wrappedNewInvoker := newInvokerDelegate(invoker, newUrl) - oldExporter.(protocol.Exporter).Unexport() + oldExporter.(protocol.Exporter).UnExport() proto.bounds.Delete(key) - // oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible + // oldExporter UnExport function unRegister rpcService from the serviceMap, so need register it again as far as possible if err := registerServiceMap(invoker); err != nil { logger.Error(err.Error()) } @@ -402,7 +402,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { func (proto *registryProtocol) Destroy() { proto.bounds.Range(func(key, value interface{}) bool { // protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so - // the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols(). + // the work for unexport should be finished in protocol.UnExport(), see also config.destroyProviderProtocols(). exporter := value.(*exporterChangeableWrapper) reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker)) if err := reg.UnRegister(exporter.registerUrl); err != nil { @@ -415,7 +415,7 @@ func (proto *registryProtocol) Destroy() { go func() { select { case <-time.After(config.GetShutDown().GetStepTimeout() + config.GetShutDown().GetConsumerUpdateWaitTime()): - exporter.Unexport() + exporter.UnExport() proto.bounds.Delete(key) } }() @@ -481,8 +481,8 @@ type exporterChangeableWrapper struct { subscribeUrl *common.URL } -func (e *exporterChangeableWrapper) Unexport() { - e.exporter.Unexport() +func (e *exporterChangeableWrapper) UnExport() { + e.exporter.UnExport() } func (e *exporterChangeableWrapper) SetRegisterUrl(registerUrl *common.URL) { diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go index 0fdc9288a4..d6bb25f902 100644 --- a/remoting/zookeeper/curator_discovery/service_discovery.go +++ b/remoting/zookeeper/curator_discovery/service_discovery.go @@ -239,7 +239,7 @@ func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.Da sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener) } -// ListenServiceInstanceEvent add a listener in a instance +// ListenServiceInstanceEvent add a listener in an instance func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) { sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener) } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 26804732bc..d3257f9cea 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -272,7 +272,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err) - // May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait + // Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait after := time.After(timeSecondDuration(failTimes * ConnDelay)) select { case <-after: