Skip to content

Commit

Permalink
fix PolarisServiceWatcher bug (#1988)
Browse files Browse the repository at this point in the history
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.47.0 to 1.48.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](grpc/grpc-go@v1.47.0...v1.48.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
jasondeng1997 and dependabot[bot] authored Aug 2, 2022
1 parent 9334da0 commit 9d88f31
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 291 deletions.
59 changes: 30 additions & 29 deletions registry/polaris/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ func newPolarisWatcher(param *api.WatchServiceRequest, consumer api.ConsumerAPI)
// AddSubscriber add subscriber into watcher's subscribers
func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber func(remoting.EventType, []model.Instance)) {

watcher.lazyRun()

watcher.lock.Lock()
watcher.lazyRun()
defer watcher.lock.Unlock()

watcher.subscribers = append(watcher.subscribers, subscriber)
Expand All @@ -74,48 +73,50 @@ func (watcher *PolarisServiceWatcher) lazyRun() {

// startWatch start run work to watch target service by polaris
func (watcher *PolarisServiceWatcher) startWatch() {

for {
resp, err := watcher.consumer.WatchService(watcher.subscribeParam)
if err != nil {
time.Sleep(time.Duration(500 * time.Millisecond))
continue
}

watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: resp.GetAllInstancesResp.Instances,
ConfigType: remoting.EventTypeAdd,
})

select {
case event := <-resp.EventChannel:
eType := event.GetSubScribeEventType()
if eType == api.EventInstance {
insEvent := event.(*model.InstanceEvent)
if insEvent.AddEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.AddEvent.Instances,
ConfigType: remoting.EventTypeAdd,
})
}
if insEvent.UpdateEvent != nil {
instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
for i := range insEvent.UpdateEvent.UpdateList {
instances[i] = insEvent.UpdateEvent.UpdateList[i].After
for {
select {
case event := <-resp.EventChannel:
eType := event.GetSubScribeEventType()
if eType == api.EventInstance {
insEvent := event.(*model.InstanceEvent)

if insEvent.AddEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.AddEvent.Instances,
ConfigType: remoting.EventTypeAdd,
})
}
if insEvent.UpdateEvent != nil {
instances := make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
for i := range insEvent.UpdateEvent.UpdateList {
instances[i] = insEvent.UpdateEvent.UpdateList[i].After
}
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: instances,
ConfigType: remoting.EventTypeUpdate,
})
}
if insEvent.DeleteEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.DeleteEvent.Instances,
ConfigType: remoting.EventTypeDel,
})
}
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: instances,
ConfigType: remoting.EventTypeUpdate,
})
}
if insEvent.DeleteEvent != nil {
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: insEvent.DeleteEvent.Instances,
ConfigType: remoting.EventTypeDel,
})
}
}
}

}
}

Expand Down
Loading

0 comments on commit 9d88f31

Please sign in to comment.