Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix PolarisServiceWatcher bug and add ZookeeperServiceDiscovery ut #1988

Merged
merged 10 commits into from
Aug 2, 2022
59 changes: 30 additions & 29 deletions registry/polaris/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ func newPolarisWatcher(param *api.WatchServiceRequest, consumer api.ConsumerAPI)
// AddSubscriber add subscriber into watcher's subscribers
func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber func(remoting.EventType, []model.Instance)) {

watcher.lazyRun()

watcher.lock.Lock()
watcher.lazyRun()
defer watcher.lock.Unlock()

watcher.subscribers = append(watcher.subscribers, subscriber)
Expand All @@ -74,48 +73,50 @@ func (watcher *PolarisServiceWatcher) lazyRun() {

// startWatch start run work to watch target service by polaris
func (watcher *PolarisServiceWatcher) startWatch() {

for {
resp, err := watcher.consumer.WatchService(watcher.subscribeParam)
if err != nil {
time.Sleep(time.Duration(500 * time.Millisecond))
continue
}

watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: resp.GetAllInstancesResp.Instances,
ConfigType: remoting.EventTypeAdd,
})

select {
case event := <-resp.EventChannel:
eType := event.GetSubScribeEventType()
if eType == api.EventInstance {
insEvent := event.(*model.InstanceEvent)
if insEvent.AddEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.AddEvent.Instances,
ConfigType: remoting.EventTypeAdd,
})
}
if insEvent.UpdateEvent != nil {
instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
for i := range insEvent.UpdateEvent.UpdateList {
instances[i] = insEvent.UpdateEvent.UpdateList[i].After
for {
select {
case event := <-resp.EventChannel:
eType := event.GetSubScribeEventType()
if eType == api.EventInstance {
insEvent := event.(*model.InstanceEvent)

if insEvent.AddEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.AddEvent.Instances,
ConfigType: remoting.EventTypeAdd,
})
}
if insEvent.UpdateEvent != nil {
instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
for i := range insEvent.UpdateEvent.UpdateList {
instances[i] = insEvent.UpdateEvent.UpdateList[i].After
}
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: instances,
ConfigType: remoting.EventTypeUpdate,
})
}
if insEvent.DeleteEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.DeleteEvent.Instances,
ConfigType: remoting.EventTypeDel,
})
}
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: instances,
ConfigType: remoting.EventTypeUpdate,
})
}
if insEvent.DeleteEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.DeleteEvent.Instances,
ConfigType: remoting.EventTypeDel,
})
}
}
}

}
}

Expand Down
Loading