diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 2b0ff056a8..7974ad4be6 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -193,7 +193,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol { dir.configRouters() } - if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil { + if oldInvoker, _ := dir.doCacheInvoker(event.Service, event); oldInvoker != nil { oldInvokers = append(oldInvokers, oldInvoker) } } @@ -224,7 +224,7 @@ func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) stri referenceUrl := dir.GetDirectoryUrl().SubURL newUrl := common.MergeURL(event.Service, referenceUrl) event.Update(newUrl) - return newUrl.GetCacheInvokerMapKey() + return event.Key() } // setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain. @@ -240,17 +240,18 @@ func (dir *RegistryDirectory) setNewInvokers() { func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) { // judge is override or others if event != nil { - u := dir.convertUrl(event) + switch event.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: + u := dir.convertUrl(event) logger.Infof("selector add service url{%s}", event.Service) if u != nil && constant.ROUTER_PROTOCOL == u.Protocol { dir.configRouters() } - return dir.cacheInvoker(u), nil + return dir.cacheInvoker(u, event), nil case remoting.EventTypeDel: logger.Infof("selector delete service url{%s}", event.Service) - return dir.uncacheInvoker(u), nil + return dir.uncacheInvoker(event), nil default: return nil, fmt.Errorf("illegal event type: %v", event.Action) } @@ -316,8 +317,8 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } // uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil -func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { - return dir.uncacheInvokerWithKey(url.GetCacheInvokerMapKey()) +func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) protocol.Invoker { + return dir.uncacheInvokerWithKey(event.Key()) } func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker { @@ -331,7 +332,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker } // cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { +func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.ServiceEvent) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -348,15 +349,16 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeURL(url, referenceUrl) dir.overrideUrl(newUrl) - if v, ok := dir.doCacheInvoker(newUrl); ok { + event.Update(newUrl) + if v, ok := dir.doCacheInvoker(newUrl, event); ok { return v } } return nil } -func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) { - key := newUrl.GetCacheInvokerMapKey() +func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry.ServiceEvent) (protocol.Invoker, bool) { + key := event.Key() if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok { logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl) newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) diff --git a/registry/event.go b/registry/event.go index 41a899549e..76a9600a42 100644 --- a/registry/event.go +++ b/registry/event.go @@ -29,6 +29,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting" ) +type KeyFunc func(*common.URL) string + func init() { rand.Seed(time.Now().UnixNano()) } @@ -45,6 +47,7 @@ type ServiceEvent struct { key string // If the url is updated, such as Merged. updated bool + KeyFunc KeyFunc } // String return the description of event @@ -69,7 +72,11 @@ func (e *ServiceEvent) Key() string { if len(e.key) > 0 { return e.key } - e.key = e.Service.GetCacheInvokerMapKey() + if e.KeyFunc == nil { + e.key = e.Service.GetCacheInvokerMapKey() + } else { + e.key = e.KeyFunc(e.Service) + } return e.key } diff --git a/registry/event_test.go b/registry/event_test.go new file mode 100644 index 0000000000..349dc3d476 --- /dev/null +++ b/registry/event_test.go @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package registry + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" +) + +func TestKey(t *testing.T) { + u1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.0") + se := ServiceEvent{ + Service: u1, + } + assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0×tamp=") + + se2 := ServiceEvent{ + Service: u1, + KeyFunc: defineKey, + } + assert.Equal(t, se2.Key(), "Hello Key") +} + +func defineKey(url *common.URL) string { + return "Hello Key" +}