From 2ac672f9f9007550dfd5f2d446ea21b451ccffe0 Mon Sep 17 00:00:00 2001 From: Patrick Date: Tue, 21 Apr 2020 18:44:36 +0800 Subject: [PATCH 1/6] event_publishing_service_discovery.go init --- .../event_publishing_service_discovery.go | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 registry/common/event_publishing_service_discovery.go diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go new file mode 100644 index 0000000000..b595700ad0 --- /dev/null +++ b/registry/common/event_publishing_service_discovery.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" +) + +import ( + "github.com/apache/dubbo-go/registry" +) + +type EventPublishingServiceDiscovery struct { +} + +func (epsd *EventPublishingServiceDiscovery) String() string { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) Destroy() error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + panic("implement me") +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + panic("implement me") +} From 839e0ddfa0dc6c7e80f30883e580e811ab83e678 Mon Sep 17 00:00:00 2001 From: Patrick Date: Tue, 28 Apr 2020 21:24:55 +0800 Subject: [PATCH 2/6] add EventPublishingServiceDiscovery constructor --- registry/common/event_publishing_service_discovery.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go index b595700ad0..8c6881f65a 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/common/event_publishing_service_discovery.go @@ -26,7 +26,15 @@ import ( "github.com/apache/dubbo-go/registry" ) +// EventPublishingServiceDiscovery will enhance Service Discovery +// Publish some event about service discovery type EventPublishingServiceDiscovery struct { + serviceDiscovery *registry.ServiceDiscovery +} + +// NewEventPublishingServiceDiscovery is a constructor +func NewEventPublishingServiceDiscovery(serviceDiscovery *registry.ServiceDiscovery) *EventPublishingServiceDiscovery { + return &EventPublishingServiceDiscovery{serviceDiscovery: serviceDiscovery} } func (epsd *EventPublishingServiceDiscovery) String() string { From 99ac8c58da576ba0062fd405782f3dff8e33b90f Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 6 May 2020 16:54:58 +0800 Subject: [PATCH 3/6] add Event --- common/observer/event.go | 2 +- .../event_publishing_service_discovery.go | 64 ++++++++++---- registry/common/service_discovery_event.go | 87 +++++++++++++++++++ registry/common/service_instance_event.go | 68 +++++++++++++++ 4 files changed, 202 insertions(+), 19 deletions(-) create mode 100644 registry/common/service_discovery_event.go create mode 100644 registry/common/service_instance_event.go diff --git a/common/observer/event.go b/common/observer/event.go index 8c3362feee..d78179043e 100644 --- a/common/observer/event.go +++ b/common/observer/event.go @@ -58,7 +58,7 @@ func (b *BaseEvent) String() string { return fmt.Sprintf("BaseEvent[source = %#v]", b.Source) } -func newBaseEvent(source interface{}) *BaseEvent { +func NewBaseEvent(source interface{}) *BaseEvent { return &BaseEvent{ Source: source, Timestamp: time.Now(), diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go index 8c6881f65a..8c019c7aaa 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/common/event_publishing_service_discovery.go @@ -18,6 +18,7 @@ package common import ( + "github.com/apache/dubbo-go/common/extension" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -26,73 +27,100 @@ import ( "github.com/apache/dubbo-go/registry" ) +var dispatcher = extension.GetGlobalDispatcher() + // EventPublishingServiceDiscovery will enhance Service Discovery // Publish some event about service discovery type EventPublishingServiceDiscovery struct { - serviceDiscovery *registry.ServiceDiscovery + serviceDiscovery registry.ServiceDiscovery } // NewEventPublishingServiceDiscovery is a constructor -func NewEventPublishingServiceDiscovery(serviceDiscovery *registry.ServiceDiscovery) *EventPublishingServiceDiscovery { - return &EventPublishingServiceDiscovery{serviceDiscovery: serviceDiscovery} +func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscovery) *EventPublishingServiceDiscovery { + return &EventPublishingServiceDiscovery{ + serviceDiscovery: serviceDiscovery, + } } func (epsd *EventPublishingServiceDiscovery) String() string { - panic("implement me") + return epsd.serviceDiscovery.String() } func (epsd *EventPublishingServiceDiscovery) Destroy() error { - panic("implement me") + dispatcher.Dispatch(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery)) + if err := epsd.serviceDiscovery.Destroy(); err != nil { + dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + dispatcher.Dispatch(NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) + return nil } func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { - panic("implement me") + dispatcher.Dispatch(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance)) + if err := epsd.serviceDiscovery.Register(instance); err != nil { + dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + dispatcher.Dispatch(NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance)) + return nil } func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { - panic("implement me") + if err := epsd.serviceDiscovery.Update(instance); err != nil { + dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + return nil } func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - panic("implement me") + dispatcher.Dispatch(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance)) + if err := epsd.serviceDiscovery.Register(instance); err != nil { + dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + dispatcher.Dispatch(NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) + return nil } func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { - panic("implement me") + return epsd.serviceDiscovery.GetDefaultPageSize() } func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet { - panic("implement me") + return epsd.serviceDiscovery.GetServices() } func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - panic("implement me") + return epsd.serviceDiscovery.GetInstances(serviceName) } func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { - panic("implement me") + return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, pageSize) } func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { - panic("implement me") + return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, offset, pageSize, healthy) } func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { - panic("implement me") + return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize) } func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { - panic("implement me") + dispatcher.AddEventListener(listener) + return epsd.serviceDiscovery.AddListener(listener) } func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error { - panic("implement me") + return epsd.DispatchEventByServiceName(serviceName) } func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { - panic("implement me") + return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, instances) } func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { - panic("implement me") + return epsd.serviceDiscovery.DispatchEvent(event) } diff --git a/registry/common/service_discovery_event.go b/registry/common/service_discovery_event.go new file mode 100644 index 0000000000..4bc7928f24 --- /dev/null +++ b/registry/common/service_discovery_event.go @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceDiscoveryEvent struct { + observer.BaseEvent + original registry.ServiceDiscovery + err error +} + +func NewServiceDiscoveryEventWithoutError(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { + return &ServiceDiscoveryEvent{ + BaseEvent: *observer.NewBaseEvent(discovery), + original: original, + } +} + +func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryEvent { + return &ServiceDiscoveryEvent{ + BaseEvent: *observer.NewBaseEvent(discovery), + original: original, + err: err, + } +} + +func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery { + return sde.GetSource().(registry.ServiceDiscovery) +} + +func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery { + return sde.original +} + +type ServiceDiscoveryDestroyingEvent ServiceDiscoveryEvent + +type ServiceDiscoveryExceptionEvent ServiceDiscoveryEvent + +type ServiceDiscoveryInitializedEvent ServiceDiscoveryEvent + +type ServiceDiscoveryInitializingEvent ServiceDiscoveryEvent + +type ServiceDiscoveryDestroyedEvent ServiceDiscoveryEvent + +// NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent +func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent { + return (*ServiceDiscoveryDestroyingEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) +} + +// NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent +func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent { + return (*ServiceDiscoveryExceptionEvent)(NewServiceDiscoveryEvent(discovery, original, err)) +} + +// NewServiceDiscoveryInitializedEvent create a ServiceDiscoveryInitializedEvent +func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent { + return (*ServiceDiscoveryInitializedEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) +} + +// NewServiceDiscoveryInitializingEvent create a ServiceDiscoveryInitializingEvent +func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent { + return (*ServiceDiscoveryInitializingEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) +} + +// NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent +func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent { + return (*ServiceDiscoveryDestroyedEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) +} diff --git a/registry/common/service_instance_event.go b/registry/common/service_instance_event.go new file mode 100644 index 0000000000..029fa8c536 --- /dev/null +++ b/registry/common/service_instance_event.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceInstanceEvent struct { + observer.BaseEvent + serviceInstance registry.ServiceInstance +} + +// NewServiceInstanceEvent create a ServiceInstanceEvent +func NewServiceInstanceEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceEvent { + return &ServiceInstanceEvent{ + BaseEvent: *observer.NewBaseEvent(source), + serviceInstance: instance, + } +} + +func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance { + return sie.serviceInstance +} + +type ServiceInstancePreRegisteredEvent ServiceInstanceEvent + +type ServiceInstancePreUnregisteredEvent ServiceInstanceEvent + +type ServiceInstanceRegisteredEvent ServiceInstanceEvent + +type ServiceInstanceUnregisteredEvent ServiceInstanceEvent + +// NewServiceInstancePreRegisteredEvent create a ServiceInstancePreRegisteredEvent +func NewServiceInstancePreRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreRegisteredEvent { + return (*ServiceInstancePreRegisteredEvent)(NewServiceInstanceEvent(source, instance)) +} + +// NewServiceInstancePreUnregisteredEvent create a ServiceInstancePreUnregisteredEvent +func NewServiceInstancePreUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent { + return (*ServiceInstancePreUnregisteredEvent)(NewServiceInstanceEvent(source, instance)) +} + +// NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent +func NewServiceInstanceRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceRegisteredEvent { + return (*ServiceInstanceRegisteredEvent)(NewServiceInstanceEvent(source, instance)) +} + +// NewServiceInstanceUnregisteredEvent create a ServiceInstanceUnregisteredEvent +func NewServiceInstanceUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceUnregisteredEvent { + return (*ServiceInstanceUnregisteredEvent)(NewServiceInstanceEvent(source, instance)) +} From 2938f51876a0b435f289639a93026d4f528e31d5 Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 6 May 2020 18:27:58 +0800 Subject: [PATCH 4/6] optimize code --- .../event_publishing_service_discovery.go | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go index 8c019c7aaa..b34b4bc8d6 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/common/event_publishing_service_discovery.go @@ -19,6 +19,7 @@ package common import ( "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -47,41 +48,35 @@ func (epsd *EventPublishingServiceDiscovery) String() string { } func (epsd *EventPublishingServiceDiscovery) Destroy() error { - dispatcher.Dispatch(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery)) - if err := epsd.serviceDiscovery.Destroy(); err != nil { - dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) - return err + f := func() error { + return epsd.serviceDiscovery.Destroy() } - dispatcher.Dispatch(NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) - return nil + return epsd.executeWithEvents(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery), + f, NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) } func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { - dispatcher.Dispatch(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance)) - if err := epsd.serviceDiscovery.Register(instance); err != nil { - dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) - return err + f := func() error { + return epsd.serviceDiscovery.Register(instance) } - dispatcher.Dispatch(NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance)) - return nil + return epsd.executeWithEvents(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance)) + } func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { - if err := epsd.serviceDiscovery.Update(instance); err != nil { - dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) - return err + f := func() error { + return epsd.serviceDiscovery.Update(instance) } - return nil + return epsd.executeWithEvents(nil, f, nil) } func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - dispatcher.Dispatch(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance)) - if err := epsd.serviceDiscovery.Register(instance); err != nil { - dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) - return err + f := func() error { + return epsd.serviceDiscovery.Register(instance) } - dispatcher.Dispatch(NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) - return nil + return epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) } func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { @@ -124,3 +119,17 @@ func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceNa func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { return epsd.serviceDiscovery.DispatchEvent(event) } + +func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent observer.Event, f func() error, afterEvent observer.Event) error { + if beforeEvent != nil { + dispatcher.Dispatch(beforeEvent) + } + if err := f(); err != nil { + dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + if afterEvent != nil { + dispatcher.Dispatch(afterEvent) + } + return nil +} From ada27e2aca430eeb8611ef42af8bf2a00c2fd5d9 Mon Sep 17 00:00:00 2001 From: Patrick Date: Fri, 8 May 2020 11:05:35 +0800 Subject: [PATCH 5/6] add unit tests and modify event struct --- ...vent_publishing_service_deiscovery_test.go | 161 ++++++++++++++++++ .../event_publishing_service_discovery.go | 28 +-- registry/common/service_discovery_event.go | 52 +++--- registry/common/service_instance_event.go | 32 +++- 4 files changed, 234 insertions(+), 39 deletions(-) create mode 100644 registry/common/event_publishing_service_deiscovery_test.go diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/common/event_publishing_service_deiscovery_test.go new file mode 100644 index 0000000000..856a902d1c --- /dev/null +++ b/registry/common/event_publishing_service_deiscovery_test.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "reflect" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/registry" + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" +) + +func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { + dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) + tsd := &TestServiceDiscoveryDestroyingEventListener{} + tsd.SetT(t) + tsi := &TestServiceInstancePreRegisteredEventListener{} + tsi.SetT(t) + extension.AddEventListener(tsd) + extension.AddEventListener(tsi) + extension.SetEventDispatcher("direct", dispatcher2.NewDirectEventDispatcher) + extension.SetAndInitGlobalDispatcher("direct") + err := dc.Destroy() + assert.Nil(t, err) + si := ®istry.DefaultServiceInstance{Id: "testServiceInstance"} + err = dc.Register(si) + assert.Nil(t, err) + +} + +type TestServiceDiscoveryDestroyingEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceDiscoveryDestroyingEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetOriginal().String()) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetServiceDiscovery().String()) + return nil +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceDiscoveryDestroyingEvent{}) +} + +type TestServiceInstancePreRegisteredEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceInstancePreRegisteredEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceInstance", e1.getServiceInstance().GetId()) + return nil +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceInstancePreRegisteredEvent{}) +} + +type ServiceDiscoveryA struct { +} + +// String return mockServiceDiscovery +func (msd *ServiceDiscoveryA) String() string { + return "testServiceDiscovery" +} + +// Destroy do nothing +func (msd *ServiceDiscoveryA) Destroy() error { + return nil +} + +func (msd *ServiceDiscoveryA) Register(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Update(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Unregister(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) GetDefaultPageSize() int { + return 1 +} + +func (msd *ServiceDiscoveryA) GetServices() *gxset.HashSet { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstances(serviceName string) []registry.ServiceInstance { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) AddListener(listener *registry.ServiceInstancesChangedListener) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventByServiceName(serviceName string) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + return nil +} diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go index b34b4bc8d6..43d2917c32 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/common/event_publishing_service_discovery.go @@ -17,6 +17,10 @@ package common +import ( + "github.com/apache/dubbo-go/registry" +) + import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" @@ -24,12 +28,6 @@ import ( gxpage "github.com/dubbogo/gost/page" ) -import ( - "github.com/apache/dubbo-go/registry" -) - -var dispatcher = extension.GetGlobalDispatcher() - // EventPublishingServiceDiscovery will enhance Service Discovery // Publish some event about service discovery type EventPublishingServiceDiscovery struct { @@ -43,10 +41,12 @@ func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscove } } +// String func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } +// Destroy delegate function func (epsd *EventPublishingServiceDiscovery) Destroy() error { f := func() error { return epsd.serviceDiscovery.Destroy() @@ -55,6 +55,7 @@ func (epsd *EventPublishingServiceDiscovery) Destroy() error { f, NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) } +// Register delegate function func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { f := func() error { return epsd.serviceDiscovery.Register(instance) @@ -64,6 +65,7 @@ func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceI } +// Update delegate function func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { f := func() error { return epsd.serviceDiscovery.Update(instance) @@ -71,9 +73,10 @@ func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceIns return epsd.executeWithEvents(nil, f, nil) } +// Unregister delegate function func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { f := func() error { - return epsd.serviceDiscovery.Register(instance) + return epsd.serviceDiscovery.Unregister(instance) } return epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance), f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) @@ -103,8 +106,9 @@ func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames [] return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize) } +// AddListener add event listener func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { - dispatcher.AddEventListener(listener) + extension.GetGlobalDispatcher().AddEventListener(listener) return epsd.serviceDiscovery.AddListener(listener) } @@ -120,16 +124,18 @@ func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.Servi return epsd.serviceDiscovery.DispatchEvent(event) } +// executeWithEvents dispatch before event and after event if return error will dispatch exception event func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent observer.Event, f func() error, afterEvent observer.Event) error { + globalDispatcher := extension.GetGlobalDispatcher() if beforeEvent != nil { - dispatcher.Dispatch(beforeEvent) + globalDispatcher.Dispatch(beforeEvent) } if err := f(); err != nil { - dispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + globalDispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) return err } if afterEvent != nil { - dispatcher.Dispatch(afterEvent) + globalDispatcher.Dispatch(afterEvent) } return nil } diff --git a/registry/common/service_discovery_event.go b/registry/common/service_discovery_event.go index 4bc7928f24..a60ca56a39 100644 --- a/registry/common/service_discovery_event.go +++ b/registry/common/service_discovery_event.go @@ -25,24 +25,15 @@ import ( type ServiceDiscoveryEvent struct { observer.BaseEvent original registry.ServiceDiscovery - err error } -func NewServiceDiscoveryEventWithoutError(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { +func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { return &ServiceDiscoveryEvent{ BaseEvent: *observer.NewBaseEvent(discovery), original: original, } } -func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryEvent { - return &ServiceDiscoveryEvent{ - BaseEvent: *observer.NewBaseEvent(discovery), - original: original, - err: err, - } -} - func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery { return sde.GetSource().(registry.ServiceDiscovery) } @@ -51,37 +42,58 @@ func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery { return sde.original } -type ServiceDiscoveryDestroyingEvent ServiceDiscoveryEvent +// ServiceDiscoveryDestroyingEvent +// this event will be dispatched before service discovery be destroyed +type ServiceDiscoveryDestroyingEvent struct { + ServiceDiscoveryEvent +} -type ServiceDiscoveryExceptionEvent ServiceDiscoveryEvent +// ServiceDiscoveryExceptionEvent +// this event will be dispatched when the error occur in service discovery +type ServiceDiscoveryExceptionEvent struct { + ServiceDiscoveryEvent + err error +} -type ServiceDiscoveryInitializedEvent ServiceDiscoveryEvent +// ServiceDiscoveryInitializedEvent +// this event will be dispatched after service discovery initialize +type ServiceDiscoveryInitializedEvent struct { + ServiceDiscoveryEvent +} -type ServiceDiscoveryInitializingEvent ServiceDiscoveryEvent +// ServiceDiscoveryInitializingEvent +// this event will be dispatched before service discovery initialize +type ServiceDiscoveryInitializingEvent struct { + ServiceDiscoveryEvent +} -type ServiceDiscoveryDestroyedEvent ServiceDiscoveryEvent +// ServiceDiscoveryDestroyedEvent +// this event will be dispatched after service discovery be destroyed +type ServiceDiscoveryDestroyedEvent struct { + ServiceDiscoveryEvent +} // NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent { - return (*ServiceDiscoveryDestroyingEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) + return &ServiceDiscoveryDestroyingEvent{*NewServiceDiscoveryEvent(discovery, original)} } // NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent { - return (*ServiceDiscoveryExceptionEvent)(NewServiceDiscoveryEvent(discovery, original, err)) + return &ServiceDiscoveryExceptionEvent{*NewServiceDiscoveryEvent(discovery, original), err} } // NewServiceDiscoveryInitializedEvent create a ServiceDiscoveryInitializedEvent func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent { - return (*ServiceDiscoveryInitializedEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) + return &ServiceDiscoveryInitializedEvent{*NewServiceDiscoveryEvent(discovery, original)} } // NewServiceDiscoveryInitializingEvent create a ServiceDiscoveryInitializingEvent func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent { - return (*ServiceDiscoveryInitializingEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) + return &ServiceDiscoveryInitializingEvent{*NewServiceDiscoveryEvent(discovery, original)} } // NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent { - return (*ServiceDiscoveryDestroyedEvent)(NewServiceDiscoveryEventWithoutError(discovery, original)) + return &ServiceDiscoveryDestroyedEvent{*NewServiceDiscoveryEvent(discovery, original)} } diff --git a/registry/common/service_instance_event.go b/registry/common/service_instance_event.go index 029fa8c536..f70e7ee0ff 100644 --- a/registry/common/service_instance_event.go +++ b/registry/common/service_instance_event.go @@ -39,30 +39,46 @@ func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance { return sie.serviceInstance } -type ServiceInstancePreRegisteredEvent ServiceInstanceEvent +// ServiceInstancePreRegisteredEvent +// this event will be dispatched before service instance be registered +type ServiceInstancePreRegisteredEvent struct { + ServiceInstanceEvent +} -type ServiceInstancePreUnregisteredEvent ServiceInstanceEvent +// ServiceInstancePreUnregisteredEvent +// this event will be dispatched before service instance be unregistered +type ServiceInstancePreUnregisteredEvent struct { + ServiceInstanceEvent +} -type ServiceInstanceRegisteredEvent ServiceInstanceEvent +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be registered +type ServiceInstanceRegisteredEvent struct { + ServiceInstanceEvent +} -type ServiceInstanceUnregisteredEvent ServiceInstanceEvent +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be unregistered +type ServiceInstanceUnregisteredEvent struct { + ServiceInstanceEvent +} // NewServiceInstancePreRegisteredEvent create a ServiceInstancePreRegisteredEvent func NewServiceInstancePreRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreRegisteredEvent { - return (*ServiceInstancePreRegisteredEvent)(NewServiceInstanceEvent(source, instance)) + return &ServiceInstancePreRegisteredEvent{*NewServiceInstanceEvent(source, instance)} } // NewServiceInstancePreUnregisteredEvent create a ServiceInstancePreUnregisteredEvent func NewServiceInstancePreUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent { - return (*ServiceInstancePreUnregisteredEvent)(NewServiceInstanceEvent(source, instance)) + return &ServiceInstancePreUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} } // NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent func NewServiceInstanceRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceRegisteredEvent { - return (*ServiceInstanceRegisteredEvent)(NewServiceInstanceEvent(source, instance)) + return &ServiceInstanceRegisteredEvent{*NewServiceInstanceEvent(source, instance)} } // NewServiceInstanceUnregisteredEvent create a ServiceInstanceUnregisteredEvent func NewServiceInstanceUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceUnregisteredEvent { - return (*ServiceInstanceUnregisteredEvent)(NewServiceInstanceEvent(source, instance)) + return &ServiceInstanceUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} } From 9f7e1f3c5a8f9b50d2e9e2eebf05f2e59b679646 Mon Sep 17 00:00:00 2001 From: Patrick Date: Fri, 8 May 2020 11:11:44 +0800 Subject: [PATCH 6/6] fix imports --- registry/common/event_publishing_service_deiscovery_test.go | 4 ++-- registry/common/event_publishing_service_discovery.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/common/event_publishing_service_deiscovery_test.go index 856a902d1c..1e08335e04 100644 --- a/registry/common/event_publishing_service_deiscovery_test.go +++ b/registry/common/event_publishing_service_deiscovery_test.go @@ -23,6 +23,8 @@ import ( ) import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -32,8 +34,6 @@ import ( "github.com/apache/dubbo-go/common/observer" dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/registry" - gxset "github.com/dubbogo/gost/container/set" - gxpage "github.com/dubbogo/gost/page" ) func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go index 43d2917c32..f61dd84690 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/common/event_publishing_service_discovery.go @@ -18,14 +18,14 @@ package common import ( - "github.com/apache/dubbo-go/registry" + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" ) import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" - gxset "github.com/dubbogo/gost/container/set" - gxpage "github.com/dubbogo/gost/page" + "github.com/apache/dubbo-go/registry" ) // EventPublishingServiceDiscovery will enhance Service Discovery