diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/azure_vmclient.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/azure_vmclient.go index c3357964e3..67aaaac613 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/azure_vmclient.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/azure_vmclient.go @@ -324,34 +324,32 @@ func (c *Client) listVmssFlexVMs(ctx context.Context, vmssFlexID string, statusO } // Update updates a VirtualMachine. -func (c *Client) Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) *retry.Error { +func (c *Client) Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) (*compute.VirtualMachine, *retry.Error) { mc := metrics.NewMetricContext("vm", "update", resourceGroupName, c.subscriptionID, source) // Report errors if the client is rate limited. if !c.rateLimiterWriter.TryAccept() { mc.RateLimitedCount() - return retry.GetRateLimitError(true, "VMUpdate") + return nil, retry.GetRateLimitError(true, "VMUpdate") } // Report errors if the client is throttled. if c.RetryAfterWriter.After(time.Now()) { mc.ThrottledCount() rerr := retry.GetThrottlingError("VMUpdate", "client throttled", c.RetryAfterWriter) - return rerr + return nil, rerr } - rerr := c.updateVM(ctx, resourceGroupName, VMName, parameters, source) + result, rerr := c.updateVM(ctx, resourceGroupName, VMName, parameters, source) mc.Observe(rerr) if rerr != nil { if rerr.IsThrottled() { // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. c.RetryAfterWriter = rerr.RetryAfter } - - return rerr + return result, rerr } - - return nil + return result, rerr } // UpdateAsync updates a VirtualMachine asynchronously @@ -393,7 +391,7 @@ func (c *Client) UpdateAsync(ctx context.Context, resourceGroupName string, VMNa } // WaitForUpdateResult waits for the response of the update request -func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error { +func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) (*compute.VirtualMachine, *retry.Error) { mc := metrics.NewMetricContext("vm", "wait_for_update_result", resourceGroupName, c.subscriptionID, source) response, err := c.armClient.WaitForAsyncOperationResult(ctx, future, "VMWaitForUpdateResult") mc.Observe(retry.NewErrorOrNil(false, err)) @@ -405,13 +403,24 @@ func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future, } else { klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error()) } - return retry.GetError(response, err) + return nil, retry.GetError(response, err) } - return nil + if response != nil && response.StatusCode != http.StatusNoContent { + result, rerr := c.updateResponder(response) + if rerr != nil { + klog.V(5).Infof("Received error in WaitForAsyncOperationResult updateResponder: '%s'", rerr.Error()) + } + + return result, rerr + } + + result := &compute.VirtualMachine{} + result.Response = autorest.Response{Response: response} + return result, nil } // updateVM updates a VirtualMachine. -func (c *Client) updateVM(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) *retry.Error { +func (c *Client) updateVM(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) (*compute.VirtualMachine, *retry.Error) { resourceID := armclient.GetResourceID( c.subscriptionID, resourceGroupName, @@ -423,18 +432,20 @@ func (c *Client) updateVM(ctx context.Context, resourceGroupName string, VMName defer c.armClient.CloseResponse(ctx, response) if rerr != nil { klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vm.put.request", resourceID, rerr.Error()) - return rerr + return nil, rerr } if response != nil && response.StatusCode != http.StatusNoContent { - _, rerr = c.updateResponder(response) + result, rerr := c.updateResponder(response) if rerr != nil { klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vm.put.respond", resourceID, rerr.Error()) - return rerr } + return result, rerr } - return nil + result := &compute.VirtualMachine{} + result.Response = autorest.Response{Response: response} + return result, nil } func (c *Client) updateResponder(resp *http.Response) (*compute.VirtualMachine, *retry.Error) { diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/interface.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/interface.go index 5c41811df2..20058fa980 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/interface.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/interface.go @@ -53,13 +53,13 @@ type Interface interface { CreateOrUpdate(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachine, source string) *retry.Error // Update updates a VirtualMachine. - Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) *retry.Error + Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) (*compute.VirtualMachine, *retry.Error) // UpdateAsync updates a VirtualMachine asynchronously UpdateAsync(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) (*azure.Future, *retry.Error) // WaitForUpdateResult waits for the response of the update request - WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error + WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) (*compute.VirtualMachine, *retry.Error) // Delete deletes a VirtualMachine. Delete(ctx context.Context, resourceGroupName string, VMName string) *retry.Error diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient/interface.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient/interface.go index 4ecd399f50..29845eec07 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient/interface.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient/interface.go @@ -143,11 +143,12 @@ func (mr *MockInterfaceMockRecorder) ListVmssFlexVMsWithoutInstanceView(ctx, vms } // Update mocks base method. -func (m *MockInterface) Update(ctx context.Context, resourceGroupName, VMName string, parameters compute.VirtualMachineUpdate, source string) *retry.Error { +func (m *MockInterface) Update(ctx context.Context, resourceGroupName, VMName string, parameters compute.VirtualMachineUpdate, source string) (*compute.VirtualMachine, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Update", ctx, resourceGroupName, VMName, parameters, source) - ret0, _ := ret[0].(*retry.Error) - return ret0 + ret0, _ := ret[0].(*compute.VirtualMachine) + ret1, _ := ret[1].(*retry.Error) + return ret0, ret1 } // Update indicates an expected call of Update. @@ -172,11 +173,12 @@ func (mr *MockInterfaceMockRecorder) UpdateAsync(ctx, resourceGroupName, VMName, } // WaitForUpdateResult mocks base method. -func (m *MockInterface) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error { +func (m *MockInterface) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) (*compute.VirtualMachine, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WaitForUpdateResult", ctx, future, resourceGroupName, source) - ret0, _ := ret[0].(*retry.Error) - return ret0 + ret0, _ := ret[0].(*compute.VirtualMachine) + ret1, _ := ret[1].(*retry.Error) + return ret0, ret1 } // WaitForUpdateResult indicates an expected call of WaitForUpdateResult. diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/consts/consts.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/consts/consts.go index 04838b5d1d..80d26c353b 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/consts/consts.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/consts/consts.go @@ -383,6 +383,8 @@ const ( CannotUpdateVMBeingDeletedMessagePrefix = "'Put on Virtual Machine Scale Set VM Instance' is not allowed on Virtual Machine Scale Set" // CannotUpdateVMBeingDeletedMessageSuffix is the suffix of the error message that the request failed due to delete a VM that is being deleted CannotUpdateVMBeingDeletedMessageSuffix = "since it is marked for deletion" + // OperationPreemptedErrorCode is the error code returned for vm operation preempted errors + OperationPreemptedErrorCode = "OperationPreempted" ) // node ipam controller diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_common.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_common.go index c45b75d7d3..e6c2b7670c 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_common.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_common.go @@ -28,6 +28,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute" + "github.com/Azure/go-autorest/autorest/azure" "k8s.io/apimachinery/pkg/types" kwait "k8s.io/apimachinery/pkg/util/wait" @@ -38,6 +39,7 @@ import ( azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" ) const ( @@ -54,6 +56,10 @@ const ( attachDiskMapKeySuffix = "attachdiskmap" detachDiskMapKeySuffix = "detachdiskmap" + updateVMRetryDuration = time.Duration(1) * time.Second + updateVMRetryFactor = 3.0 + updateVMRetrySteps = 5 + // WriteAcceleratorEnabled support for Azure Write Accelerator on Azure Disks // https://docs.microsoft.com/azure/virtual-machines/windows/how-to-enable-write-accelerator WriteAcceleratorEnabled = "writeacceleratorenabled" @@ -72,9 +78,16 @@ var defaultBackOff = kwait.Backoff{ Jitter: 0.0, } +var updateVMBackoff = kwait.Backoff{ + Duration: updateVMRetryDuration, + Factor: updateVMRetryFactor, + Steps: updateVMRetrySteps, +} + var ( managedDiskPathRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/disks/(.+)`) diskSnapshotPathRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/snapshots/(.+)`) + errorCodeRE = regexp.MustCompile(`Code="(.*?)".*`) ) type controllerCommon struct { @@ -253,10 +266,20 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, async bool, diskName, } c.diskStateMap.Store(disk, "attaching") defer c.diskStateMap.Delete(disk) - future, err := vmset.AttachDisk(ctx, nodeName, diskMap) + + defer func() { + // invalidate the cache if there is error in disk attach + if err != nil { + _ = vmset.DeleteCacheForNode(string(nodeName)) + } + }() + + var future *azure.Future + future, err = vmset.AttachDisk(ctx, nodeName, diskMap) if err != nil { return -1, err } + // err will be handled by waitForUpdateResult below if async && c.diskOpRateLimiter.TryAccept() { // unlock and wait for attach disk complete @@ -267,7 +290,40 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, async bool, diskName, klog.Warningf("azureDisk - switch to batch operation due to rate limited, QPS: %f", c.diskOpRateLimiter.QPS()) } } - return lun, vmset.WaitForUpdateResult(ctx, future, nodeName, "attach_disk") + + if err = c.waitForUpdateResult(ctx, vmset, nodeName, future, err); err != nil { + return -1, err + } + return lun, nil +} + +// waitForUpdateResult handles asynchronous VM update operations and retries with backoff if OperationPreempted error is observed +func (c *controllerCommon) waitForUpdateResult(ctx context.Context, vmset VMSet, nodeName types.NodeName, future *azure.Future, updateErr error) (err error) { + err = updateErr + + if err == nil { + err = vmset.WaitForUpdateResult(ctx, future, nodeName, "attach_disk") + } + + if vmUpdateRequired(future, err) { + if derr := kwait.ExponentialBackoffWithContext(ctx, updateVMBackoff, func() (bool, error) { + klog.Errorf("Retry VM Update on node (%s) due to error (%v)", nodeName, err) + future, err = vmset.UpdateVMAsync(ctx, nodeName) + if err == nil { + err = vmset.WaitForUpdateResult(ctx, future, nodeName, "attach_disk") + } + return !vmUpdateRequired(future, err), nil + }); derr != nil { + err = derr + return + } + } + + if err != nil && configAccepted(future) { + err = retry.NewPartialUpdateError(err.Error()) + } + + return } func (c *controllerCommon) insertAttachDiskRequest(diskURI, nodeName string, options *AttachDiskOptions) error { @@ -359,7 +415,7 @@ func (c *controllerCommon) DetachDisk(ctx context.Context, diskName, diskURI str } else { lun, _, errGetLun := c.GetDiskLun(diskName, diskURI, nodeName) if errGetLun == nil || !strings.Contains(errGetLun.Error(), consts.CannotFindDiskLUN) { - return fmt.Errorf("disk(%s) is still attached to node(%s) on lun(%d), error: %v", diskURI, nodeName, lun, errGetLun) + return fmt.Errorf("disk(%s) is still attached to node(%s) on lun(%d), error: %w", diskURI, nodeName, lun, errGetLun) } } @@ -620,6 +676,11 @@ func (c *controllerCommon) checkDiskExists(ctx context.Context, diskURI string) return true, nil } +func vmUpdateRequired(future *azure.Future, err error) bool { + errCode := getAzureErrorCode(err) + return configAccepted(future) && errCode == consts.OperationPreemptedErrorCode +} + func getValidCreationData(subscriptionID, resourceGroup, sourceResourceID, sourceType string) (compute.CreationData, error) { if sourceResourceID == "" { return compute.CreationData{ @@ -663,3 +724,21 @@ func isInstanceNotFoundError(err error) bool { } return strings.Contains(errMsg, errStatusCode400) && strings.Contains(errMsg, errInvalidParameter) && strings.Contains(errMsg, errTargetInstanceIds) } + +// getAzureErrorCode uses regex to parse out the error code encapsulated in the error string. +func getAzureErrorCode(err error) string { + if err == nil { + return "" + } + matches := errorCodeRE.FindStringSubmatch(err.Error()) + if matches == nil { + return "" + } + return matches[1] +} + +// configAccepted returns true if storage profile change had been committed (i.e. HTTP status code == 2xx) and returns false otherwise. +func configAccepted(future *azure.Future) bool { + // if status code indicates success, the storage profile change was committed + return future != nil && future.Response() != nil && future.Response().StatusCode/100 == 2 +} diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_standard.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_standard.go index 16ae72ec98..58efbd7c3b 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_standard.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_standard.go @@ -31,6 +31,7 @@ import ( azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" ) // AttachDisk attaches a disk to vm @@ -100,10 +101,6 @@ func (as *availabilitySet) AttachDisk(ctx context.Context, nodeName types.NodeNa }, } klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk list(%s)", nodeResourceGroup, vmName, diskMap) - // Invalidate the cache right after updating - defer func() { - _ = as.DeleteCacheForNode(vmName) - }() future, rerr := as.VirtualMachinesClient.UpdateAsync(ctx, nodeResourceGroup, vmName, newVM, "attach_disk") if rerr != nil { @@ -124,8 +121,13 @@ func (as *availabilitySet) AttachDisk(ctx context.Context, nodeName types.NodeNa } func (as *availabilitySet) DeleteCacheForNode(nodeName string) error { - _ = as.cloud.vmCache.Delete(nodeName) - return nil + err := as.cloud.vmCache.Delete(nodeName) + if err == nil { + klog.V(2).Infof("DeleteCacheForNode(%s) successfully", nodeName) + } else { + klog.Errorf("DeleteCacheForNode(%s) failed with %v", nodeName, err) + } + return err } // WaitForUpdateResult waits for the response of the update request @@ -136,9 +138,17 @@ func (as *availabilitySet) WaitForUpdateResult(ctx context.Context, future *azur return err } - if rerr := as.VirtualMachinesClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source); rerr != nil { + result, rerr := as.VirtualMachinesClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source) + if rerr != nil { return rerr.Error() } + + // clean node cache first and then update cache + _ = as.DeleteCacheForNode(vmName) + if result != nil && result.VirtualMachineProperties != nil { + // if we have an updated result, we update the vmss vm cache + as.updateCache(vmName, result) + } return nil } @@ -198,19 +208,28 @@ func (as *availabilitySet) DetachDisk(ctx context.Context, nodeName types.NodeNa }, } klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk list(%s)", nodeResourceGroup, vmName, nodeName, diskMap) - // Invalidate the cache right after updating + + var result *compute.VirtualMachine + var rerr *retry.Error defer func() { + // invalidate the cache right after updating _ = as.DeleteCacheForNode(vmName) + + // update the cache with the updated result only if its not nil + // and contains the VirtualMachineProperties + if rerr == nil && result != nil && result.VirtualMachineProperties != nil { + as.updateCache(vmName, result) + } }() - rerr := as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk") + result, rerr = as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk") if rerr != nil { klog.Errorf("azureDisk - detach disk list(%s) on rg(%s) vm(%s) failed, err: %v", diskMap, nodeResourceGroup, vmName, rerr) if rerr.HTTPStatusCode == http.StatusNotFound { klog.Errorf("azureDisk - begin to filterNonExistingDisks(%v) on rg(%s) vm(%s)", diskMap, nodeResourceGroup, vmName) disks := as.filterNonExistingDisks(ctx, *vm.StorageProfile.DataDisks) newVM.VirtualMachineProperties.StorageProfile.DataDisks = &disks - rerr = as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk") + result, rerr = as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk") } } @@ -223,23 +242,47 @@ func (as *availabilitySet) DetachDisk(ctx context.Context, nodeName types.NodeNa // UpdateVM updates a vm func (as *availabilitySet) UpdateVM(ctx context.Context, nodeName types.NodeName) error { + future, err := as.UpdateVMAsync(ctx, nodeName) + if err != nil { + return err + } + + return as.WaitForUpdateResult(ctx, future, nodeName, "update_vm") +} + +// UpdateVMAsync updates a vm asynchronously +func (as *availabilitySet) UpdateVMAsync(ctx context.Context, nodeName types.NodeName) (*azure.Future, error) { vmName := mapNodeNameToVMName(nodeName) nodeResourceGroup, err := as.GetNodeResourceGroup(vmName) if err != nil { - return err + return nil, err } klog.V(2).Infof("azureDisk - update(%s): vm(%s)", nodeResourceGroup, vmName) - // Invalidate the cache right after updating + + var result *compute.VirtualMachine + var rerr *retry.Error defer func() { + // invalidate the cache right after updating _ = as.DeleteCacheForNode(vmName) + + // update the cache with the updated result only if its not nil + // and contains the VirtualMachineProperties + if rerr == nil && result != nil && result.VirtualMachineProperties != nil { + as.updateCache(vmName, result) + } }() - rerr := as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, compute.VirtualMachineUpdate{}, "update_vm") + future, rerr := as.VirtualMachinesClient.UpdateAsync(ctx, nodeResourceGroup, vmName, compute.VirtualMachineUpdate{}, "update_vm") klog.V(2).Infof("azureDisk - update(%s): vm(%s) - returned with %v", nodeResourceGroup, vmName, rerr) if rerr != nil { - return rerr.Error() + return future, rerr.Error() } - return nil + return future, nil +} + +func (as *availabilitySet) updateCache(nodeName string, vm *compute.VirtualMachine) { + as.cloud.vmCache.Update(nodeName, vm) + klog.V(2).Infof("updateCache(%s) successfully", nodeName) } // GetDataDisks gets a list of data disks attached to the node. diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmss.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmss.go index 922bae5884..dd7a6410ff 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmss.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmss.go @@ -107,11 +107,6 @@ func (ss *ScaleSet) AttachDisk(ctx context.Context, nodeName types.NodeName, dis }, } - // Invalidate the cache right after updating - defer func() { - _ = ss.DeleteCacheForNode(vmName) - }() - klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk list(%s)", nodeResourceGroup, nodeName, diskMap) future, rerr := ss.VirtualMachineScaleSetVMsClient.UpdateAsync(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, newVM, "attach_disk") if rerr != nil { @@ -139,23 +134,30 @@ func (ss *ScaleSet) WaitForUpdateResult(ctx context.Context, future *azure.Futur return err } - var result *compute.VirtualMachineScaleSetVM - var rerr *retry.Error - defer func() { - if rerr == nil && result != nil && result.VirtualMachineScaleSetVMProperties != nil { - // If we have an updated result, we update the vmss vm cache - vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) - if err != nil { - return - } - _ = ss.updateCache(vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, result) - } - }() - - result, rerr = ss.VirtualMachineScaleSetVMsClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source) + result, rerr := ss.VirtualMachineScaleSetVMsClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source) if rerr != nil { return rerr.Error() } + + var vmssName, instanceID string + if result != nil && result.VirtualMachineScaleSetVMProperties != nil { + // get vmssName, instanceID from cache first + vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) + if err == nil && vm != nil { + vmssName = vm.VMSSName + instanceID = vm.InstanceID + } else { + klog.Errorf("getVmssVM failed with error(%v) or nil pointer", err) + } + } + + // clean node cache first and then update cache + _ = ss.DeleteCacheForNode(vmName) + if vmssName != "" && instanceID != "" { + if err := ss.updateCache(vmName, nodeResourceGroup, vmssName, instanceID, result); err != nil { + klog.Errorf("updateCache(%s, %s, %s, %s) failed with error: %v", vmName, nodeResourceGroup, vmssName, instanceID, err) + } + } return nil } @@ -219,30 +221,23 @@ func (ss *ScaleSet) DetachDisk(ctx context.Context, nodeName types.NodeName, dis }, } - var updateResult *compute.VirtualMachineScaleSetVM + var result *compute.VirtualMachineScaleSetVM var rerr *retry.Error defer func() { - // If there is an error with Update operation, - // invalidate the cache - if rerr != nil { - _ = ss.DeleteCacheForNode(vmName) - return - } + _ = ss.DeleteCacheForNode(vmName) // Update the cache with the updated result only if its not nil // and contains the VirtualMachineScaleSetVMProperties - if updateResult != nil && updateResult.VirtualMachineScaleSetVMProperties != nil { - if err := ss.updateCache(vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, updateResult); err != nil { - klog.Errorf("updateCache(%s, %s, %s) failed with error: %v", vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, err) - // if err faced during updating cache, invalidate the cache - _ = ss.DeleteCacheForNode(vmName) + if rerr == nil && result != nil && result.VirtualMachineScaleSetVMProperties != nil { + if err := ss.updateCache(vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, result); err != nil { + klog.Errorf("updateCache(%s, %s, %s, %s) failed with error: %v", vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, err) } } }() klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk list(%s)", nodeResourceGroup, nodeName, diskMap) - updateResult, rerr = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, newVM, + result, rerr = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, newVM, "detach_disk") if rerr != nil { klog.Errorf("azureDisk - detach disk list(%s) on rg(%s) vm(%s) failed, err: %v", diskMap, nodeResourceGroup, nodeName, rerr) @@ -250,7 +245,7 @@ func (ss *ScaleSet) DetachDisk(ctx context.Context, nodeName types.NodeName, dis klog.Errorf("azureDisk - begin to filterNonExistingDisks(%v) on rg(%s) vm(%s)", diskMap, nodeResourceGroup, nodeName) disks := ss.filterNonExistingDisks(ctx, *newVM.VirtualMachineScaleSetVMProperties.StorageProfile.DataDisks) newVM.VirtualMachineScaleSetVMProperties.StorageProfile.DataDisks = &disks - updateResult, rerr = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, newVM, "detach_disk") + result, rerr = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, newVM, "detach_disk") } } @@ -263,20 +258,31 @@ func (ss *ScaleSet) DetachDisk(ctx context.Context, nodeName types.NodeName, dis // UpdateVM updates a vm func (ss *ScaleSet) UpdateVM(ctx context.Context, nodeName types.NodeName) error { + future, err := ss.UpdateVMAsync(ctx, nodeName) + if err != nil { + return err + } + + return ss.WaitForUpdateResult(ctx, future, nodeName, "update_vm") +} + +// UpdateVMAsync updates a vm asynchronously +func (ss *ScaleSet) UpdateVMAsync(ctx context.Context, nodeName types.NodeName) (*azure.Future, error) { vmName := mapNodeNameToVMName(nodeName) vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) if err != nil { - return err + return nil, err } nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName) if err != nil { - return err + return nil, err } - var updateResult *compute.VirtualMachineScaleSetVM + var future *azure.Future var rerr *retry.Error + // Invalidate the cache right after updating defer func() { // If there is an error with Update operation, // invalidate the cache @@ -284,26 +290,16 @@ func (ss *ScaleSet) UpdateVM(ctx context.Context, nodeName types.NodeName) error _ = ss.DeleteCacheForNode(vmName) return } - - // Update the cache with the updated result only if its not nil - // and contains the VirtualMachineScaleSetVMProperties - if updateResult != nil && updateResult.VirtualMachineScaleSetVMProperties != nil { - if err := ss.updateCache(vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, updateResult); err != nil { - klog.Errorf("updateCache(%s, %s, %s) failed with error: %v", vmName, nodeResourceGroup, vm.VMSSName, vm.InstanceID, err) - // if err faced during updating cache, invalidate the cache - _ = ss.DeleteCacheForNode(vmName) - } - } }() klog.V(2).Infof("azureDisk - update(%s): vm(%s)", nodeResourceGroup, nodeName) - updateResult, rerr = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, compute.VirtualMachineScaleSetVM{}, "update_vmss_instance") + future, rerr = ss.VirtualMachineScaleSetVMsClient.UpdateAsync(ctx, nodeResourceGroup, vm.VMSSName, vm.InstanceID, compute.VirtualMachineScaleSetVM{}, "update_vmss_instance") klog.V(2).Infof("azureDisk - update(%s): vm(%s) - returned with %v", nodeResourceGroup, nodeName, rerr) if rerr != nil { - return rerr.Error() + return future, rerr.Error() } - return nil + return future, nil } // GetDataDisks gets a list of data disks attached to the node. diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmssflex.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmssflex.go index 42964e2cde..2832b40eba 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmssflex.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_controller_vmssflex.go @@ -21,14 +21,17 @@ import ( "fmt" "net/http" "strings" + "sync" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute" "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/to" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" ) // AttachDisk attaches a disk to vm @@ -98,10 +101,6 @@ func (fs *FlexScaleSet) AttachDisk(ctx context.Context, nodeName types.NodeName, }, } - defer func() { - _ = fs.DeleteCacheForNode(vmName) - }() - klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk list(%s)", nodeResourceGroup, vmName, diskMap) future, rerr := fs.VirtualMachinesClient.UpdateAsync(ctx, nodeResourceGroup, *vm.Name, newVM, "attach_disk") @@ -178,20 +177,30 @@ func (fs *FlexScaleSet) DetachDisk(ctx context.Context, nodeName types.NodeName, }, } + var result *compute.VirtualMachine + var rerr *retry.Error defer func() { _ = fs.DeleteCacheForNode(vmName) + + // update the cache with the updated result only if its not nil + // and contains the VirtualMachineProperties + if rerr == nil && result != nil && result.VirtualMachineProperties != nil { + if err := fs.updateCache(vmName, result); err != nil { + klog.Errorf("updateCache(%s) failed with error: %v", vmName, err) + } + } }() klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk list(%s)", nodeResourceGroup, vmName, nodeName, diskMap) - rerr := fs.VirtualMachinesClient.Update(ctx, nodeResourceGroup, *vm.Name, newVM, "detach_disk") + result, rerr = fs.VirtualMachinesClient.Update(ctx, nodeResourceGroup, *vm.Name, newVM, "detach_disk") if rerr != nil { klog.Errorf("azureDisk - detach disk list(%s) on rg(%s) vm(%s) failed, err: %v", diskMap, nodeResourceGroup, vmName, rerr) if rerr.HTTPStatusCode == http.StatusNotFound { klog.Errorf("azureDisk - begin to filterNonExistingDisks(%v) on rg(%s) vm(%s)", diskMap, nodeResourceGroup, vmName) disks := fs.filterNonExistingDisks(ctx, *vm.StorageProfile.DataDisks) newVM.VirtualMachineProperties.StorageProfile.DataDisks = &disks - rerr = fs.VirtualMachinesClient.Update(ctx, nodeResourceGroup, *vm.Name, newVM, "detach_disk") + result, rerr = fs.VirtualMachinesClient.Update(ctx, nodeResourceGroup, *vm.Name, newVM, "detach_disk") } } @@ -209,37 +218,102 @@ func (fs *FlexScaleSet) WaitForUpdateResult(ctx context.Context, future *azure.F if err != nil { return err } - if rerr := fs.VirtualMachinesClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source); rerr != nil { + result, rerr := fs.VirtualMachinesClient.WaitForUpdateResult(ctx, future, nodeResourceGroup, source) + if rerr != nil { return rerr.Error() } + + // clean node cache first and then update cache + _ = fs.DeleteCacheForNode(vmName) + if result != nil && result.VirtualMachineProperties != nil { + if err := fs.updateCache(vmName, result); err != nil { + klog.Errorf("updateCache(%s) failed with error: %v", vmName, err) + } + } return nil } // UpdateVM updates a vm func (fs *FlexScaleSet) UpdateVM(ctx context.Context, nodeName types.NodeName) error { + future, err := fs.UpdateVMAsync(ctx, nodeName) + if err != nil { + return err + } + + return fs.WaitForUpdateResult(ctx, future, nodeName, "update_vm") +} + +// UpdateVMAsync updates a vm asynchronously +func (fs *FlexScaleSet) UpdateVMAsync(ctx context.Context, nodeName types.NodeName) (*azure.Future, error) { vmName := mapNodeNameToVMName(nodeName) + vm, err := fs.getVmssFlexVM(vmName, azcache.CacheReadTypeDefault) if err != nil { // if host doesn't exist, no need to update klog.Warningf("azureDisk - cannot find node %s, skip updating vm)", nodeName) - return nil + return nil, nil } nodeResourceGroup, err := fs.GetNodeResourceGroup(vmName) if err != nil { - return err + return nil, err } + var result *compute.VirtualMachine + var rerr *retry.Error defer func() { _ = fs.DeleteCacheForNode(vmName) + + // update the cache with the updated result only if its not nil + // and contains the VirtualMachineProperties + if rerr == nil && result != nil && result.VirtualMachineProperties != nil { + if err := fs.updateCache(vmName, result); err != nil { + klog.Errorf("updateCache(%s) failed with error: %v", vmName, err) + } + } }() klog.V(2).Infof("azureDisk - update(%s): vm(%s)", nodeResourceGroup, vmName) - rerr := fs.VirtualMachinesClient.Update(ctx, nodeResourceGroup, *vm.Name, compute.VirtualMachineUpdate{}, "update_vm") + future, rerr := fs.VirtualMachinesClient.UpdateAsync(ctx, nodeResourceGroup, *vm.Name, compute.VirtualMachineUpdate{}, "update_vm") klog.V(2).Infof("azureDisk - update(%s): vm(%s) - returned with %v", nodeResourceGroup, vmName, rerr) if rerr != nil { - return rerr.Error() + return future, rerr.Error() + } + return future, nil +} + +func (fs *FlexScaleSet) updateCache(nodeName string, vm *compute.VirtualMachine) error { + if vm == nil { + return fmt.Errorf("vm is nil") + } + if vm.Name == nil { + return fmt.Errorf("vm.Name is nil") + } + if vm.VirtualMachineProperties == nil { + return fmt.Errorf("vm.VirtualMachineProperties is nil") } + if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { + return fmt.Errorf("vm.OsProfile.ComputerName is nil") + } + + vmssFlexID, err := fs.getNodeVmssFlexID(nodeName) + if err != nil { + return err + } + + fs.lockMap.LockEntry(vmssFlexID) + defer fs.lockMap.UnlockEntry(vmssFlexID) + cached, err := fs.vmssFlexVMCache.Get(vmssFlexID, azcache.CacheReadTypeDefault) + if err != nil { + return err + } + vmMap := cached.(*sync.Map) + vmMap.Store(nodeName, vm) + fs.vmssFlexVMCache.Update(vmssFlexID, vmMap) + + fs.vmssFlexVMNameToVmssID.Store(strings.ToLower(*vm.OsProfile.ComputerName), vmssFlexID) + fs.vmssFlexVMNameToNodeName.Store(*vm.Name, strings.ToLower(*vm.OsProfile.ComputerName)) + klog.V(2).Infof("updateCache(%s) for vmssFlexID(%s) successfully", nodeName, vmssFlexID) return nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go index d829587960..e2935ffdc1 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go @@ -88,6 +88,9 @@ type VMSet interface { // UpdateVM updates a vm UpdateVM(ctx context.Context, nodeName types.NodeName) error + // UpdateVMAsync updates a vm asynchronously + UpdateVMAsync(ctx context.Context, nodeName types.NodeName) (*azure.Future, error) + // GetPowerStatusByNodeName returns the powerState for the specified node. GetPowerStatusByNodeName(name string) (string, error) diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go index 644a15718d..eec1d9763f 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go @@ -152,11 +152,6 @@ func (ss *ScaleSet) getVMSSVMsFromCache(resourceGroup, vmssName string, crt azca return virtualMachines, nil } -// gcVMSSVMCache delete stale VMSS VMs caches from deleted VMSSes. -func (ss *ScaleSet) gcVMSSVMCache() error { - return ss.vmssCache.Delete(consts.VMSSKey) -} - // newVMSSVirtualMachinesCache instantiates a new VMs cache for VMs belonging to the provided VMSS. func (ss *ScaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { vmssVirtualMachinesCacheTTL := time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds) * time.Second @@ -261,7 +256,7 @@ func (ss *ScaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { func (ss *ScaleSet) DeleteCacheForNode(nodeName string) error { vmManagementType, err := ss.getVMManagementTypeByNodeName(nodeName, azcache.CacheReadTypeUnsafe) if err != nil { - klog.Errorf("Failed to check VM management type: %v", err) + klog.Errorf("getVMManagementTypeByNodeName(%s) failed with %v", nodeName, err) return err } @@ -276,21 +271,23 @@ func (ss *ScaleSet) DeleteCacheForNode(nodeName string) error { node, err := ss.getNodeIdentityByNodeName(nodeName, azcache.CacheReadTypeUnsafe) if err != nil { - klog.Errorf("DeleteCacheForNode(%s) failed with error: %v", nodeName, err) + klog.Errorf("getNodeIdentityByNodeName(%s) failed with %v", nodeName, err) return err } + // get sync.Map cache and remove the node from the cache + cacheKey := getVMSSVMCacheKey(node.resourceGroup, node.vmssName) + ss.lockMap.LockEntry(cacheKey) + defer ss.lockMap.UnlockEntry(cacheKey) - err = ss.vmssVMCache.Delete(getVMSSVMCacheKey(node.resourceGroup, node.vmssName)) + virtualMachines, err := ss.getVMSSVMsFromCache(node.resourceGroup, node.vmssName, azcache.CacheReadTypeUnsafe) if err != nil { - klog.Errorf("DeleteCacheForNode(%s) failed to remove from vmssVMCache with error: %v", nodeName, err) + klog.Errorf("getVMSSVMsFromCache(%s, %s) failed with %v", node.resourceGroup, node.vmssName, err) return err } - // Delete in VMSS VM cache - if err := ss.gcVMSSVMCache(); err != nil { - klog.Errorf("DeleteCacheForNode(%s) failed to gc stale vmss caches: %v", nodeName, err) - } - + virtualMachines.Delete(nodeName) + ss.vmssVMCache.Update(cacheKey, virtualMachines) + klog.V(2).Infof("DeleteCacheForNode(%s, %s, %s) successfully", node.resourceGroup, node.vmssName, nodeName) return nil } @@ -302,12 +299,9 @@ func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceI virtualMachines, err := ss.getVMSSVMsFromCache(resourceGroupName, vmssName, azcache.CacheReadTypeUnsafe) if err != nil { - err = fmt.Errorf("updateCache(%s, %s, %s) failed getting vmCache with error: %v", vmssName, resourceGroupName, nodeName, err) - return err + return fmt.Errorf("updateCache(%s, %s, %s) failed getting vmCache with error: %w", vmssName, resourceGroupName, nodeName, err) } - localCache := &sync.Map{} - vmssVMCacheEntry := &VMSSVirtualMachinesEntry{ ResourceGroup: resourceGroupName, VMSSName: vmssName, @@ -316,8 +310,10 @@ func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceI LastUpdate: time.Now().UTC(), } + localCache := &sync.Map{} localCache.Store(nodeName, vmssVMCacheEntry) + // copy all elements except current VM to localCache virtualMachines.Range(func(key, value interface{}) bool { if key.(string) != nodeName { localCache.Store(key.(string), value.(*VMSSVirtualMachinesEntry)) @@ -326,7 +322,7 @@ func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceI }) ss.vmssVMCache.Update(cacheKey, localCache) - klog.V(4).Infof("updateCache(%s, %s, %s) for cacheKey(%s) updated successfully", vmssName, resourceGroupName, nodeName, cacheKey) + klog.V(2).Infof("updateCache(%s, %s, %s) for cacheKey(%s) updated successfully", vmssName, resourceGroupName, nodeName, cacheKey) return nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex_cache.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex_cache.go index 6b6a8d131d..9371f9f83f 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex_cache.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex_cache.go @@ -18,6 +18,7 @@ package provider import ( "context" + "errors" "fmt" "strings" "sync" @@ -156,7 +157,7 @@ func (fs *FlexScaleSet) getNodeNameByVMName(vmName string) (string, error) { } nodeName, err := getter(vmName, azcache.CacheReadTypeDefault) - if err == cloudprovider.InstanceNotFound { + if errors.Is(err, cloudprovider.InstanceNotFound) { klog.V(2).Infof("Could not find node (%s) in the existing cache. Forcely freshing the cache to check again...", nodeName) return getter(vmName, azcache.CacheReadTypeForceRefresh) } @@ -197,7 +198,7 @@ func (fs *FlexScaleSet) getNodeVmssFlexID(nodeName string) (string, error) { } vmssFlexID, err := getter(nodeName, azcache.CacheReadTypeDefault) - if err == cloudprovider.InstanceNotFound { + if errors.Is(err, cloudprovider.InstanceNotFound) { klog.V(2).Infof("Could not find node (%s) in the existing cache. Forcely freshing the cache to check again...", nodeName) return getter(nodeName, azcache.CacheReadTypeForceRefresh) } @@ -316,17 +317,28 @@ func (fs *FlexScaleSet) getVmssFlexByName(vmssFlexName string) (*compute.Virtual func (fs *FlexScaleSet) DeleteCacheForNode(nodeName string) error { vmssFlexID, err := fs.getNodeVmssFlexID(nodeName) if err != nil { + klog.Errorf("getNodeVmssFlexID(%s) failed with %v", nodeName, err) return err } + fs.lockMap.LockEntry(vmssFlexID) + defer fs.lockMap.UnlockEntry(vmssFlexID) cached, err := fs.vmssFlexVMCache.Get(vmssFlexID, azcache.CacheReadTypeDefault) if err != nil { + klog.Errorf("vmssFlexVMCache.Get(%s, %s) failed with %v", vmssFlexID, nodeName, err) + return err + } + if cached == nil { + err := fmt.Errorf("nil cache returned from %s", vmssFlexID) + klog.Errorf("DeleteCacheForNode(%s, %s) failed with %v", vmssFlexID, nodeName, err) return err } vmMap := cached.(*sync.Map) vmMap.Delete(nodeName) + fs.vmssFlexVMCache.Update(vmssFlexID, vmMap) fs.vmssFlexVMNameToVmssID.Delete(nodeName) + klog.V(2).Infof("DeleteCacheForNode(%s, %s) successfully", vmssFlexID, nodeName) return nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/retry/azure_error.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/retry/azure_error.go index 290faedf56..c7e9a71d14 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/retry/azure_error.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/retry/azure_error.go @@ -352,7 +352,7 @@ func GetVMSSMetadataByRawError(err *Error) (string, string, error) { reg := regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.*)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+).`) matches := reg.FindStringSubmatch(err.ServiceErrorMessage()) if len(matches) != 3 { - return "", "", fmt.Errorf("GetVMSSMetadataByRawError: couldn't find a VMSS resource Id from error message %s", err.RawError) + return "", "", fmt.Errorf("GetVMSSMetadataByRawError: couldn't find a VMSS resource Id from error message %w", err.RawError) } return matches[1], matches[2], nil @@ -425,3 +425,16 @@ func getOperationNotAllowedReason(msg string) string { } return OperationNotAllowed } + +// PartialUpdateError implements error interface. It is meant to be returned for errors with http status code of 2xx +type PartialUpdateError struct { + message string +} + +func NewPartialUpdateError(msg string) *PartialUpdateError { + return &PartialUpdateError{message: msg} +} + +func (e *PartialUpdateError) Error() string { + return e.message +}