Skip to content

Commit

Permalink
Migrate GCE client to server side operation wait
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrqq committed Feb 20, 2024
1 parent 5286b3f commit 687f141
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 74 deletions.
85 changes: 40 additions & 45 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type AutoscalingGceClient interface {
ResizeMig(GceRef, int64) error
DeleteInstances(migRef GceRef, instances []GceRef) error
CreateInstances(GceRef, string, int64, []string) error

// extension point for new operation wrappers
WaitForOperation(operationName, operationType, project, zone string) error
}

type autoscalingGceClientV1 struct {
Expand All @@ -123,39 +126,34 @@ type autoscalingGceClientV1 struct {
projectId string
domainUrl string

// These can be overridden, e.g. for testing.
operationWaitTimeout time.Duration
operationPollInterval time.Duration
operationDeletionPollInterval time.Duration
// Can be overridden, e.g. for testing.
operationWaitTimeout time.Duration
}

// NewAutoscalingGceClientV1WithTimeout creates a new client with custom timeouts
// for communicating with GCE v1 API
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string,
waitTimeout, pollInterval, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string, waitTimeout time.Duration) (*autoscalingGceClientV1, error) {
gceService, err := gce.New(client)
if err != nil {
return nil, err
}
gceService.UserAgent = userAgent

return &autoscalingGceClientV1{
projectId: projectId,
gceService: gceService,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
operationDeletionPollInterval: deletionPollInterval,
projectId: projectId,
gceService: gceService,
operationWaitTimeout: waitTimeout,
}, nil
}

// NewAutoscalingGceClientV1 creates a new client for communicating with GCE v1 API.
func NewAutoscalingGceClientV1(client *http.Client, projectId string, userAgent string) (*autoscalingGceClientV1, error) {
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval, defaultOperationDeletionPollInterval)
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout)
}

// NewCustomAutoscalingGceClientV1 creates a new client using custom server url and timeouts
// for communicating with GCE v1 API.
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string,
waitTimeout, pollInterval time.Duration, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string, waitTimeout time.Duration) (*autoscalingGceClientV1, error) {
gceService, err := gce.New(client)
if err != nil {
return nil, err
Expand All @@ -164,12 +162,10 @@ func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl,
gceService.UserAgent = userAgent

return &autoscalingGceClientV1{
projectId: projectId,
gceService: gceService,
domainUrl: domainUrl,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
operationDeletionPollInterval: deletionPollInterval,
projectId: projectId,
gceService: gceService,
domainUrl: domainUrl,
operationWaitTimeout: waitTimeout,
}, nil
}

Expand Down Expand Up @@ -240,7 +236,7 @@ func (client *autoscalingGceClientV1) ResizeMig(migRef GceRef, size int64) error
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) error {
Expand All @@ -257,7 +253,7 @@ func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName st
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
Expand All @@ -274,32 +270,31 @@ func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
return instanceNames
}

func (client *autoscalingGceClientV1) waitForOp(operation *gce.Operation, project, zone string, isDeletion bool) error {
pollInterval := client.operationPollInterval
if isDeletion {
pollInterval = client.operationDeletionPollInterval
}
for start := time.Now(); time.Since(start) < client.operationWaitTimeout; time.Sleep(pollInterval) {
klog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name)
registerRequest("zone_operations", "get")
if op, err := client.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil {
klog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status)
if op.Status == "DONE" {
if op.Error != nil {
errBytes, err := op.Error.MarshalJSON()
if err != nil {
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
}
return fmt.Errorf("error while getting operation %s on %s: %s", operation.Name, operation.TargetLink, errBytes)
}
func (client *autoscalingGceClientV1) WaitForOperation(operationName, operationType, project, zone string) error {
ctx, cancel := context.WithTimeout(context.TODO(), client.operationWaitTimeout)
defer cancel()

for {
klog.V(4).Infof("Waiting for operation %s (%s/%s/%s)", operationName, project, zone, operationType)
registerRequest("zone_operations", "wait")
op, err := client.gceService.ZoneOperations.Wait(project, zone, operationName).Context(ctx).Do()
if err != nil {
return fmt.Errorf("error while waiting for operation %s: %w", operationName, err)
}

return nil
klog.V(4).Infof("Operation %s (%s/%s/%s) status: %s", operationName, project, zone, operationType, op.Status)
if op.Status == "DONE" {
if op.Error != nil {
errBytes, err := op.Error.MarshalJSON()
if err != nil {
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
}
return fmt.Errorf("error while waiting for operation %s: %s", op.Name, errBytes)
}
} else {
klog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err)

return nil
}
}
return fmt.Errorf("timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
}

func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances []GceRef) error {
Expand All @@ -315,7 +310,7 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, true)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]GceInstance, error) {
Expand Down
49 changes: 24 additions & 25 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package gce

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -92,15 +93,11 @@ func TestWaitForOp(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

g.operationPollInterval = 1 * time.Millisecond
g.operationWaitTimeout = 500 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(3)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponse).Once()

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Times(3)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponse).Once()
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation(operation.Name, "TestWaitForOp", projectId, zoneB)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
}
Expand All @@ -110,11 +107,10 @@ func TestWaitForOpError(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponseError).Once()

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponseError).Once()
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation(operation.Name, "TestWaitForOpError", projectId, zoneB)
assert.Error(t, err)
}

Expand All @@ -123,21 +119,28 @@ func TestWaitForOpTimeout(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

// The values here are higher than in other tests since we're aiming for timeout.
// Lower values make this fragile and flakey.
g.operationPollInterval = 10 * time.Millisecond
g.operationWaitTimeout = 49 * time.Millisecond

// Sometimes, only 3 calls are made, but it doesn't really matter,
// so let's not assert expectations for this mock, just check for timeout error.
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(5)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Once()

operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation(operation.Name, "TestWaitForOpTimeout", projectId, zoneB)
assert.Error(t, err)
}

func TestWaitForOpContextTimeout(t *testing.T) {
server := test_util.NewHttpServerMock()
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

g.operationWaitTimeout = 1 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").After(50 * time.Millisecond).Return(operationDoneResponse).Once()
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.WaitForOperation(operation.Name, "TestWaitForOpContextTimeout", projectId, zoneB)
assert.ErrorIs(t, err, context.DeadlineExceeded)
}

func TestErrors(t *testing.T) {
const instanceUrl = "https://content.googleapis.com/compute/v1/projects/myprojid/zones/myzone/instances/myinst"
server := test_util.NewHttpServerMock()
Expand Down Expand Up @@ -553,12 +556,8 @@ func TestUserAgent(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "testuseragent")

g.operationPollInterval = 10 * time.Millisecond
g.operationWaitTimeout = 49 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return("testuseragent", operationRunningResponse).Maybe()

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return("testuseragent", operationRunningResponse).Maybe()
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

g.waitForOp(operation, projectId, zoneB, false)
g.WaitForOperation(operation.Name, "TestUserAgent", projectId, zoneB)
}
7 changes: 3 additions & 4 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa

// Override wait for op timeouts.
gceService.operationWaitTimeout = 50 * time.Millisecond
gceService.operationPollInterval = 1 * time.Millisecond

cache := &GceCache{
migs: make(map[GceRef]Mig),
Expand Down Expand Up @@ -473,7 +472,7 @@ func TestDeleteInstances(t *testing.T) {
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()

server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/deleteInstances").Return(deleteInstancesResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa").Return(deleteInstancesOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa/wait").Return(deleteInstancesOperationResponse).Once()

instances := []GceRef{
{
Expand Down Expand Up @@ -583,7 +582,7 @@ func TestGetAndSetMigSize(t *testing.T) {

// set target size for extraPoolMig; will require resize API call and API call for polling for resize operation
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%s/resize", extraPoolMigName)).Return(setMigSizeResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931").Return(setMigSizeOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931/wait").Return(setMigSizeOperationResponse).Once()
err = g.SetMigSize(extraPoolMig, 4)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
Expand Down Expand Up @@ -1509,7 +1508,7 @@ func TestAppendInstances(t *testing.T) {
defaultPoolMig := setupTestDefaultPool(g, true)
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%v/createInstances", defaultPoolMig.gceRef.Name)).Return(createInstancesResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32").Return(createInstancesOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32/wait").Return(createInstancesOperationResponse).Once()
err := g.CreateInstances(defaultPoolMig, 2)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ in
return nil
}

func (client *mockAutoscalingGceClient) WaitForOperation(_, _, _, _ string) error {
return nil
}

func TestFillMigInstances(t *testing.T) {
migRef := GceRef{Project: "test", Zone: "zone-A", Name: "some-mig"}
oldInstances := []GceInstance{
Expand Down

0 comments on commit 687f141

Please sign in to comment.