diff --git a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/BUILD index 0d6da56ec02c..7dcf18a296cc 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/BUILD @@ -36,6 +36,7 @@ go_library( "//pkg/util/io:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go index 97ef17f6b7a0..39d25bca0c92 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" vol "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "github.com/golang/glog" ) @@ -1215,7 +1216,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu return false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err) } - if err = deleter.Delete(); err != nil { + opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete") + err = deleter.Delete() + opComplete(err) + if err != nil { // Deleter failed return false, err } @@ -1325,7 +1329,9 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa return } + opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision") volume, err = provisioner.Provision() + opComplete(err) if err != nil { strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err) glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err) diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/BUILD b/vendor/k8s.io/kubernetes/pkg/volume/util/BUILD index c2ff2f56693b..ddaade57f51b 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/BUILD @@ -17,6 +17,7 @@ go_library( "doc.go", "fs.go", "io_util.go", + "metrics.go", "util.go", ], tags = ["automanaged"], @@ -27,6 +28,7 @@ go_library( "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/util/mount:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go b/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go new file mode 100644 index 000000000000..087bbfff4169 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go @@ -0,0 +1,63 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var storageOperationMetric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "storage_operation_duration_seconds", + Help: "Storage operation duration", + }, + []string{"volume_plugin", "operation_name"}, +) + +var storageOperationErrorMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "storage_operation_errors_total", + Help: "Storage operation errors", + }, + []string{"volume_plugin", "operation_name"}, +) + +func init() { + registerMetrics() +} + +func registerMetrics() { + prometheus.MustRegister(storageOperationMetric) + prometheus.MustRegister(storageOperationErrorMetric) +} + +// OperationCompleteHook returns a hook to call when an operation is completed +func OperationCompleteHook(plugin, operationName string) func(error) { + requestTime := time.Now() + opComplete := func(err error) { + timeTaken := time.Since(requestTime).Seconds() + // Create metric with operation name and plugin name + if err != nil { + storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc() + } else { + storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken) + } + } + return opComplete +} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 64d70900a37b..a55ea70deca2 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -55,7 +55,7 @@ type NestedPendingOperations interface { // concatenation of volumeName and podName is removed from the list of // executing operations allowing a new operation to be started with the // volumeName without error. - Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error + Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish @@ -94,7 +94,8 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, podName types.UniquePodName, - operationFunc func() error) error { + operationFunc func() error, + operationCompleteFunc func(error)) error { grm.lock.Lock() defer grm.lock.Unlock() opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) @@ -132,6 +133,7 @@ func (grm *nestedPendingOperations) Run( defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() defer grm.operationComplete(volumeName, podName, &err) + defer operationCompleteFunc(err) // Handle panic, if any, from operationFunc() defer k8sRuntime.RecoverFromPanic(&err) return operationFunc() diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index ce079407a63b..19e0d62fe480 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -50,7 +50,7 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { operation := func() error { return nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation) + err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) // Assert if err != nil { @@ -66,8 +66,8 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { operation := func() error { return nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, operation) - err2 := grm.Run(volume2Name, "" /* operationSubName */, operation) + err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {}) + err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {}) // Assert if err1 != nil { @@ -88,8 +88,8 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { operation := func() error { return nil } // Act - err1 := grm.Run(volumeName, operation1PodName, operation) - err2 := grm.Run(volumeName, operation2PodName, operation) + err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {}) + err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {}) // Assert if err1 != nil { @@ -108,7 +108,7 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { operation := func() error { return nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation) + err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) // Assert if err != nil { @@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2) + err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2) + err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2) + err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2) + err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) // Assert if err2 == nil { @@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1) + err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2) + err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) // Assert if err2 == nil { @@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1) + err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2) + err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) // Assert if err2 == nil { @@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) // Assert if err2 == nil { @@ -344,7 +344,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -352,7 +352,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) // Assert if err2 == nil { @@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3) + err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -388,7 +388,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -396,7 +396,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) // Assert if err2 == nil { @@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3) + err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1) + err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } @@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1) + err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go index 0c1569095f50..da95adbba975 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumehelper" @@ -535,29 +536,32 @@ func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - attachFunc, err := + attachFunc, plugin, err := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) if err != nil { return err } + opCompleteFunc := util.OperationCompleteHook(plugin, "volume_attach") return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, attachFunc) + volumeToAttach.VolumeName, "" /* podName */, attachFunc, opCompleteFunc) } func (oe *operationExecutor) DetachVolume( volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - detachFunc, err := + detachFunc, plugin, err := oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) if err != nil { return err } + opCompleteFunc := util.OperationCompleteHook(plugin, "volume_detach") return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, detachFunc) + volumeToDetach.VolumeName, "" /* podName */, detachFunc, opCompleteFunc) } + func (oe *operationExecutor) VerifyVolumesAreAttached( attachedVolumes map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) { @@ -630,9 +634,11 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( if err != nil { glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err) } + + opCompleteFunc := util.OperationCompleteHook(pluginName, "verify_volumes_are_attached") // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc, opCompleteFunc) if err != nil { glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -648,8 +654,10 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( if err != nil { return err } + + opCompleteFunc := util.OperationCompleteHook("", "verify_volumes_are_attached_per_node") // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc, opCompleteFunc) } func (oe *operationExecutor) MountVolume( @@ -657,7 +665,7 @@ func (oe *operationExecutor) MountVolume( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error { - mountFunc, err := oe.operationGenerator.GenerateMountVolumeFunc( + mountFunc, plugin, err := oe.operationGenerator.GenerateMountVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) if err != nil { return err @@ -671,15 +679,17 @@ func (oe *operationExecutor) MountVolume( podName = volumehelper.GetUniquePodName(volumeToMount.Pod) } + // TODO mount_device + opCompleteFunc := util.OperationCompleteHook(plugin, "volume_mount") return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, mountFunc) + volumeToMount.VolumeName, podName, mountFunc, opCompleteFunc) } func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - unmountFunc, err := + unmountFunc, plugin, err := oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { return err @@ -689,36 +699,39 @@ func (oe *operationExecutor) UnmountVolume( // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) + opCompleteFunc := util.OperationCompleteHook(plugin, "volume_unmount") return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, unmountFunc) + volumeToUnmount.VolumeName, podName, unmountFunc, opCompleteFunc) } func (oe *operationExecutor) UnmountDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - unmountDeviceFunc, err := + unmountDeviceFunc, plugin, err := oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) if err != nil { return err } + opCompleteFunc := util.OperationCompleteHook(plugin, "unmount_device") return oe.pendingOperations.Run( - deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc) + deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc) } func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - verifyControllerAttachedVolumeFunc, err := + verifyControllerAttachedVolumeFunc, plugin, err := oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) if err != nil { return err } + opCompleteFunc := util.OperationCompleteHook(plugin, "verify_controller_attached_volume") return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc) + volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc) } // TODO: this is a workaround for the unmount device issue caused by gci mounter. diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go index b312b29d451b..941b7bf88c45 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -239,29 +239,29 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera } } -func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } -func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } -func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } -func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { return func() error { @@ -269,17 +269,17 @@ func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolum return nil }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } -func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { return func() error { startOperationAndBlock(fopg.ch, fopg.quit) return nil - }, nil + }, "", nil } func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go index 9e81eed67b66..b97ae9630f00 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go @@ -73,25 +73,25 @@ func NewOperationGenerator(kubeClient clientset.Interface, // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable type OperationGenerator interface { // Generates the MountVolume function needed to perform the mount of a volume plugin - GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, error) + GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) // Generates the UnmountVolume function needed to perform the unmount of a volume plugin - GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) + GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) // Generates the AttachVolume function needed to perform attach of a volume plugin - GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) // Generates the DetachVolume function needed to perform the detach of a volume plugin - GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) // Generates the VolumesAreAttached function needed to verify if volume plugins are attached GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) // Generates the UnMountDevice function needed to perform the unmount of a device - GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) + GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) // Generates the function needed to check if the attach_detach controller has attached the volume plugin - GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) // GetVolumePluginMgr returns volume plugin manager GetVolumePluginMgr() *volume.VolumePluginMgr @@ -245,17 +245,17 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { // Get attacher plugin attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + return nil, "", volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() if newAttacherErr != nil { - return nil, volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) + return nil, attachableVolumePlugin.GetPluginName(), volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) } return func() error { @@ -283,7 +283,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } return nil - }, nil + }, attachableVolumePlugin.GetPluginName(), nil } func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { @@ -293,9 +293,10 @@ func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach AttachedVolume, verifySafeToDetach bool, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { var volumeName string var attachableVolumePlugin volume.AttachableVolumePlugin + var pluginName string var err error if volumeToDetach.VolumeSpec != nil { @@ -303,31 +304,35 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return nil, "", volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } volumeName, err = attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) if err != nil { - return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) + return nil, attachableVolumePlugin.GetPluginName(), volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) } } else { - var pluginName string // Get attacher plugin and the volumeName by splitting the volume unique name in case // there's no VolumeSpec: this happens only on attach/detach controller crash recovery // when a pod has been deleted during the controller downtime pluginName, volumeName, err = volumehelper.SplitUniqueName(volumeToDetach.VolumeName) if err != nil { - return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) + return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) } attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil { - return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } } + + if pluginName == "" { + pluginName = attachableVolumePlugin.GetPluginName() + } + volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) + return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) } return func() error { @@ -352,24 +357,24 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach.VolumeName, volumeToDetach.NodeName) return nil - }, nil + }, pluginName, nil } func (og *operationGenerator) GenerateMountVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - isRemount bool) (func() error, error) { + isRemount bool) (func() error, string, error) { // Get mounter plugin volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { - return nil, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) + return nil, "", volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) } affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin) if affinityErr != nil { - return nil, affinityErr + return nil, volumePlugin.GetPluginName(), affinityErr } volumeMounter, newMounterErr := volumePlugin.NewMounter( @@ -379,13 +384,13 @@ func (og *operationGenerator) GenerateMountVolumeFunc( if newMounterErr != nil { eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return nil, detailedErr + return nil, volumePlugin.GetPluginName(), detailedErr } mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin) if mountCheckError != nil { - return nil, mountCheckError + return nil, volumePlugin.GetPluginName(), mountCheckError } // Get attacher, if possible @@ -489,23 +494,23 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } return nil - }, nil + }, volumePlugin.GetPluginName(), nil } func (og *operationGenerator) GenerateUnmountVolumeFunc( volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { // Get mountable plugin volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) if err != nil || volumePlugin == nil { - return nil, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) + return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) } volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) if newUnmounterErr != nil { - return nil, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) + return nil, volumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) } return func() error { @@ -535,28 +540,28 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( } return nil - }, nil + }, volumePlugin.GetPluginName(), nil } func (og *operationGenerator) GenerateUnmountDeviceFunc( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) (func() error, error) { + mounter mount.Interface) (func() error, string, error) { // Get attacher plugin attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) + return nil, "", deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) } volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) + return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) } volumeAttacher, err := attachableVolumePlugin.NewAttacher() if err != nil { - return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) + return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) } return func() error { @@ -616,13 +621,19 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( } return nil - }, nil + }, attachableVolumePlugin.GetPluginName(), nil } func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount VolumeToMount, nodeName types.NodeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return nil, "", volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) + } + return func() error { if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is @@ -678,7 +689,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( // Volume not attached, return error. Caller will log and retry. return volumeToMount.GenerateErrorDetailed("Volume not attached according to node status", nil) - }, nil + }, volumePlugin.GetPluginName(), nil } func (og *operationGenerator) verifyVolumeIsSafeToDetach(