diff --git a/pkg/registry/etcd/ns_server.go b/pkg/registry/etcd/ns_server.go index 0c693be..ff8f927 100644 --- a/pkg/registry/etcd/ns_server.go +++ b/pkg/registry/etcd/ns_server.go @@ -20,6 +20,7 @@ import ( "context" "errors" "io" + "time" "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/sdk/pkg/tools/log" @@ -74,38 +75,54 @@ func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.N } func (n *etcdNSRegistryServer) watch(query *registry.NetworkServiceQuery, s registry.NetworkServiceRegistry_FindServer) error { - logger := log.FromContext(n.chainContext).WithField("etcdNSRegistryServer", "watch") - - for { - watcher, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Watch(s.Context(), metav1.ListOptions{}) + var watchErr error + for watchErr == nil { + timeoutSeconds := int64(time.Minute / time.Second) + watcher, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Watch(s.Context(), metav1.ListOptions{ + TimeoutSeconds: &timeoutSeconds, + }) if err != nil { return err } - var event watch.Event - for watcherOpened := true; watcherOpened; { - select { - case <-s.Context().Done(): - return s.Context().Err() - case event, watcherOpened = <-watcher.ResultChan(): - if !watcherOpened { - logger.Warn("watcher is closed, retrying") - continue - } - if event.Type != watch.Added { - continue - } - model := event.Object.(*v1.NetworkService) - item := (*registry.NetworkService)(&model.Spec) - if matchutils.MatchNetworkServices(query.NetworkService, item) { - err := s.Send(item) - if err != nil { - return err - } + watchErr = n.handleWatcher(watcher, query, s) + + watcher.Stop() + } + return watchErr +} + +func (n *etcdNSRegistryServer) handleWatcher( + watcher watch.Interface, + query *registry.NetworkServiceQuery, + s registry.NetworkServiceRegistry_FindServer, +) error { + logger := log.FromContext(n.chainContext).WithField("etcdNSRegistryServer", "handleWatcher") + + var event watch.Event + for watcherOpened := true; watcherOpened; { + select { + case <-s.Context().Done(): + return s.Context().Err() + case event, watcherOpened = <-watcher.ResultChan(): + if !watcherOpened { + logger.Warn("watcher is closed, retrying") + continue + } + if event.Type != watch.Added { + continue + } + model := event.Object.(*v1.NetworkService) + item := (*registry.NetworkService)(&model.Spec) + if matchutils.MatchNetworkServices(query.NetworkService, item) { + err := s.Send(item) + if err != nil { + return err } } } } + return nil } func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s registry.NetworkServiceRegistry_FindServer) error { diff --git a/pkg/registry/etcd/nse_server.go b/pkg/registry/etcd/nse_server.go index 0ceb249..a62efc4 100644 --- a/pkg/registry/etcd/nse_server.go +++ b/pkg/registry/etcd/nse_server.go @@ -20,6 +20,7 @@ import ( "context" "errors" "io" + "time" "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/sdk/pkg/tools/log" @@ -116,44 +117,60 @@ func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registr } func (n *etcdNSERegistryServer) watch(query *registry.NetworkServiceEndpointQuery, s registry.NetworkServiceEndpointRegistry_FindServer) error { - logger := log.FromContext(n.chainContext).WithField("etcdNSERegistryServer", "watch") - - for { - watcher, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Watch(s.Context(), metav1.ListOptions{}) + var watchErr error + for watchErr == nil { + timeoutSeconds := int64(time.Minute / time.Second) + watcher, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Watch(s.Context(), metav1.ListOptions{ + TimeoutSeconds: &timeoutSeconds, + }) if err != nil { return err } - var event watch.Event - for watcherOpened := true; watcherOpened; { - select { - case <-n.chainContext.Done(): - return n.chainContext.Err() - case <-s.Context().Done(): - return s.Context().Err() - case event, watcherOpened = <-watcher.ResultChan(): - if !watcherOpened { - logger.Warn("watcher is closed, retrying") - continue - } - model, ok := event.Object.(*v1.NetworkServiceEndpoint) - if !ok { - logger.Errorf("event: %v", event) - continue - } - item := (*registry.NetworkServiceEndpoint)(&model.Spec) - if event.Type == watch.Deleted { - item.ExpirationTime.Seconds = -1 - } - if matchutils.MatchNetworkServiceEndpoints(query.NetworkServiceEndpoint, item) { - err := s.Send(item) - if err != nil { - return err - } + watchErr = n.handleWatcher(watcher, query, s) + + watcher.Stop() + } + return watchErr +} + +func (n *etcdNSERegistryServer) handleWatcher( + watcher watch.Interface, + query *registry.NetworkServiceEndpointQuery, + s registry.NetworkServiceEndpointRegistry_FindServer, +) error { + logger := log.FromContext(n.chainContext).WithField("etcdNSERegistryServer", "handleWatcher") + + var event watch.Event + for watcherOpened := true; watcherOpened; { + select { + case <-n.chainContext.Done(): + return n.chainContext.Err() + case <-s.Context().Done(): + return s.Context().Err() + case event, watcherOpened = <-watcher.ResultChan(): + if !watcherOpened { + logger.Warn("watcher is closed, retrying") + continue + } + model, ok := event.Object.(*v1.NetworkServiceEndpoint) + if !ok { + logger.Errorf("event: %v", event) + continue + } + item := (*registry.NetworkServiceEndpoint)(&model.Spec) + if event.Type == watch.Deleted { + item.ExpirationTime.Seconds = -1 + } + if matchutils.MatchNetworkServiceEndpoints(query.NetworkServiceEndpoint, item) { + err := s.Send(item) + if err != nil { + return err } } } } + return nil } // NewNetworkServiceEndpointRegistryServer creates new registry.NetworkServiceRegistryServer that is using etcd to store network services.