From 457ca2358afe7853069265ed02987bacec547a77 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre <105610547+danielvegamyhre@users.noreply.github.com> Date: Mon, 5 Dec 2022 11:32:27 -0800 Subject: [PATCH] Support for concurrent nodepool CRUD operations (#6748) --- .../resource_container_node_pool.go.erb | 148 ++++++++---------- .../resource_container_node_pool_test.go.erb | 90 +++++++++++ mmv1/third_party/terraform/utils/mutexkv.go | 23 ++- mmv1/third_party/terraform/utils/utils.go | 17 ++ 4 files changed, 191 insertions(+), 87 deletions(-) diff --git a/mmv1/third_party/terraform/resources/resource_container_node_pool.go.erb b/mmv1/third_party/terraform/resources/resource_container_node_pool.go.erb index 725a3c03e5ca..13646ef64b71 100644 --- a/mmv1/third_party/terraform/resources/resource_container_node_pool.go.erb +++ b/mmv1/third_party/terraform/resources/resource_container_node_pool.go.erb @@ -389,11 +389,21 @@ func (nodePoolInformation *NodePoolInformation) parent() string { ) } -func (nodePoolInformation *NodePoolInformation) lockKey() string { +func (nodePoolInformation *NodePoolInformation) clusterLockKey() string { return containerClusterMutexKey(nodePoolInformation.project, nodePoolInformation.location, nodePoolInformation.cluster) } +func (nodePoolInformation *NodePoolInformation) nodePoolLockKey(nodePoolName string) string { + return fmt.Sprintf( + "projects/%s/locations/%s/clusters/%s/nodePools/%s", + nodePoolInformation.project, + nodePoolInformation.location, + nodePoolInformation.cluster, + nodePoolName, + ) +} + func extractNodePoolInformation(d *schema.ResourceData, config *Config) (*NodePoolInformation, error) { cluster := d.Get("cluster").(string) @@ -441,8 +451,15 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } - mutexKV.Lock(nodePoolInfo.lockKey()) - defer mutexKV.Unlock(nodePoolInfo.lockKey()) + // Acquire read-lock on cluster. + clusterLockKey := nodePoolInfo.clusterLockKey() + mutexKV.RLock(clusterLockKey) + defer mutexKV.RUnlock(clusterLockKey) + + // Acquire write-lock on nodepool. + npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name) + mutexKV.Lock(npLockKey) + defer mutexKV.Unlock(npLockKey) req := &container.CreateNodePoolRequest{ NodePool: nodePool, @@ -526,12 +543,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } - //Check cluster is in running state - _, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate)) - if err != nil { - return err - } - state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate)) if err != nil { return err @@ -616,12 +627,6 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e } name := getNodePoolName(d.Id()) - //Check cluster is in running state - _, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate)) - if err != nil { - return err - } - _, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate)) if err != nil { return err @@ -660,16 +665,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e name := getNodePoolName(d.Id()) - //Check cluster is in running state - _, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate)) - if err != nil { - if isGoogleApiErrorWithCode(err, 404) { - log.Printf("[INFO] GKE cluster %s doesn't exist, skipping node pool %s deletion", nodePoolInfo.cluster, d.Id()) - return nil - } - return err - } - _, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete)) if err != nil { // If the node pool doesn't get created and then we try to delete it, we get an error, @@ -682,9 +677,15 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e } } - mutexKV.Lock(nodePoolInfo.lockKey()) - defer mutexKV.Unlock(nodePoolInfo.lockKey()) + // Acquire read-lock on cluster. + clusterLockKey := nodePoolInfo.clusterLockKey() + mutexKV.RLock(clusterLockKey) + defer mutexKV.RUnlock(clusterLockKey) + // Acquire write-lock on nodepool. + npLockKey := nodePoolInfo.nodePoolLockKey(name) + mutexKV.Lock(npLockKey) + defer mutexKV.Unlock(npLockKey) timeout := d.Timeout(schema.TimeoutDelete) startTime := time.Now() @@ -1121,13 +1122,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node config := meta.(*Config) name := d.Get(prefix + "name").(string) - lockKey := nodePoolInfo.lockKey() - userAgent, err := generateUserAgentString(d, config.userAgent) if err != nil { return err } + // Acquire read-lock on cluster. + clusterLockKey := nodePoolInfo.clusterLockKey() + mutexKV.RLock(clusterLockKey) + defer mutexKV.RUnlock(clusterLockKey) + + // Nodepool write-lock will be acquired when update function is called. + npLockKey := nodePoolInfo.nodePoolLockKey(name) + if d.HasChange(prefix + "autoscaling") { update := &container.ClusterUpdate{ DesiredNodePoolId: name, @@ -1170,11 +1177,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id()) } @@ -1210,9 +1215,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated logging_variant for node pool %s", name) @@ -1264,12 +1268,10 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node "updating GKE node pool tags", userAgent, timeout) } - - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated tags for node pool %s", name) } @@ -1304,8 +1306,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node } // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated resource labels for node pool %s", name) @@ -1336,11 +1338,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated image type in Node Pool %s", d.Id()) } @@ -1372,11 +1372,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) } @@ -1408,9 +1406,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated kubelet_config for node pool %s", name) @@ -1442,9 +1439,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated linux_node_config for node pool %s", name) @@ -1475,12 +1471,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node nodePoolInfo.location, "updating GKE node pool size", userAgent, timeout) } - - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize) } @@ -1513,11 +1506,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated management in Node Pool %s", name) } @@ -1542,12 +1533,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout) } - - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated version in Node Pool %s", name) } @@ -1570,11 +1558,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated node locations in Node Pool %s", name) } @@ -1651,12 +1637,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node // Wait until it's updated return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout) } - - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } - log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name) } @@ -1685,9 +1668,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - // Call update serially. - if err := lockedCall(lockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) diff --git a/mmv1/third_party/terraform/tests/resource_container_node_pool_test.go.erb b/mmv1/third_party/terraform/tests/resource_container_node_pool_test.go.erb index 36c65e595d45..0154ec9ab3ed 100644 --- a/mmv1/third_party/terraform/tests/resource_container_node_pool_test.go.erb +++ b/mmv1/third_party/terraform/tests/resource_container_node_pool_test.go.erb @@ -1156,6 +1156,48 @@ func TestAccContainerNodePool_shieldedInstanceConfig(t *testing.T) { }) } +func TestAccContainerNodePool_concurrent(t *testing.T) { + t.Parallel() + + cluster := fmt.Sprintf("tf-test-cluster-%s", randString(t, 10)) + np1 := fmt.Sprintf("tf-test-nodepool-%s", randString(t, 10)) + np2 := fmt.Sprintf("tf-test-nodepool-%s", randString(t, 10)) + + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckContainerNodePoolDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccContainerNodePool_concurrentCreate(cluster, np1, np2), + }, + { + ResourceName: "google_container_node_pool.np1", + ImportState: true, + ImportStateVerify: true, + }, + { + ResourceName: "google_container_node_pool.np2", + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccContainerNodePool_concurrentUpdate(cluster, np1, np2), + }, + { + ResourceName: "google_container_node_pool.np1", + ImportState: true, + ImportStateVerify: true, + }, + { + ResourceName: "google_container_node_pool.np2", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + <% unless version == 'ga' -%> func TestAccContainerNodePool_ephemeralStorageConfig(t *testing.T) { t.Parallel() @@ -2730,4 +2772,52 @@ resource "google_container_node_pool" "np" { `, cluster, np) } +func testAccContainerNodePool_concurrentCreate(cluster, np1, np2 string) string { + return fmt.Sprintf(` +resource "google_container_cluster" "cluster" { + name = "%s" + location = "us-central1-a" + initial_node_count = 3 +} + +resource "google_container_node_pool" "np1" { + name = "%s" + location = "us-central1-a" + cluster = google_container_cluster.cluster.name + initial_node_count = 2 +} + +resource "google_container_node_pool" "np2" { + name = "%s" + location = "us-central1-a" + cluster = google_container_cluster.cluster.name + initial_node_count = 2 + } +`, cluster, np1, np2) +} +func testAccContainerNodePool_concurrentUpdate(cluster, np1, np2 string) string { + return fmt.Sprintf(` +resource "google_container_cluster" "cluster" { + name = "%s" + location = "us-central1-a" + initial_node_count = 3 +} + +resource "google_container_node_pool" "np1" { + name = "%s" + location = "us-central1-a" + cluster = google_container_cluster.cluster.name + initial_node_count = 2 + version = "1.23.13-gke.900" +} + +resource "google_container_node_pool" "np2" { + name = "%s" + location = "us-central1-a" + cluster = google_container_cluster.cluster.name + initial_node_count = 2 + version = "1.23.13-gke.900" + } +`, cluster, np1, np2) +} diff --git a/mmv1/third_party/terraform/utils/mutexkv.go b/mmv1/third_party/terraform/utils/mutexkv.go index e79710426425..cd0c53a4c41d 100644 --- a/mmv1/third_party/terraform/utils/mutexkv.go +++ b/mmv1/third_party/terraform/utils/mutexkv.go @@ -13,7 +13,7 @@ import ( // their access to individual security groups based on SG ID. type MutexKV struct { lock sync.Mutex - store map[string]*sync.Mutex + store map[string]*sync.RWMutex } // Locks the mutex for the given key. Caller is responsible for calling Unlock @@ -31,13 +31,28 @@ func (m *MutexKV) Unlock(key string) { log.Printf("[DEBUG] Unlocked %q", key) } +// Acquires a read-lock on the mutex for the given key. Caller is responsible for calling RUnlock +// for the same key +func (m *MutexKV) RLock(key string) { + log.Printf("[DEBUG] RLocking %q", key) + m.get(key).RLock() + log.Printf("[DEBUG] RLocked %q", key) +} + +// Releases a read-lock on the mutex for the given key. Caller must have called RLock for the same key first +func (m *MutexKV) RUnlock(key string) { + log.Printf("[DEBUG] RUnlocking %q", key) + m.get(key).RUnlock() + log.Printf("[DEBUG] RUnlocked %q", key) +} + // Returns a mutex for the given key, no guarantee of its lock status -func (m *MutexKV) get(key string) *sync.Mutex { +func (m *MutexKV) get(key string) *sync.RWMutex { m.lock.Lock() defer m.lock.Unlock() mutex, ok := m.store[key] if !ok { - mutex = &sync.Mutex{} + mutex = &sync.RWMutex{} m.store[key] = mutex } return mutex @@ -46,6 +61,6 @@ func (m *MutexKV) get(key string) *sync.Mutex { // Returns a properly initialized MutexKV func NewMutexKV() *MutexKV { return &MutexKV{ - store: make(map[string]*sync.Mutex), + store: make(map[string]*sync.RWMutex), } } diff --git a/mmv1/third_party/terraform/utils/utils.go b/mmv1/third_party/terraform/utils/utils.go index c04c93bcf633..14d92b562f20 100644 --- a/mmv1/third_party/terraform/utils/utils.go +++ b/mmv1/third_party/terraform/utils/utils.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/errwrap" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" "google.golang.org/api/googleapi" @@ -573,3 +574,19 @@ func checkGoogleIamPolicy(value string) error { } return nil } + +// Retries an operation while the canonical error code is FAILED_PRECONDTION +// which indicates there is an incompatible operation already running on the +// cluster. This error can be safely retried until the incompatible operation +// completes, and the newly requested operation can begin. +func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, f func() error) error { + return resource.Retry(timeout, func() *resource.RetryError { + if err := lockedCall(lockKey, f); err != nil { + if isFailedPreconditionError(err) { + return resource.RetryableError(err) + } + return resource.NonRetryableError(err) + } + return nil + }) +}