Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate GCE client to server-side wait #6547

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import (
)

const (
defaultOperationWaitTimeout = 20 * time.Second
defaultOperationPollInterval = 100 * time.Millisecond
defaultOperationDeletionPollInterval = 1 * time.Second
defaultOperationWaitTimeout = 20 * time.Second
defaultOperationPollInterval = 100 * time.Millisecond
// ErrorCodeQuotaExceeded is an error code used in InstanceErrorInfo if quota exceeded error occurs.
ErrorCodeQuotaExceeded = "QUOTA_EXCEEDED"

Expand Down Expand Up @@ -115,6 +114,11 @@ type AutoscalingGceClient interface {
ResizeMig(GceRef, int64) error
DeleteInstances(migRef GceRef, instances []GceRef) error
CreateInstances(GceRef, string, int64, []string) error

// WaitForOperation can be used to poll GCE operations until completion/timeout using WAIT calls.
// Calling this is normally not needed when interacting with the client, other methods should call it internally.
// Can be used to extend the interface with more methods outside of this package.
WaitForOperation(operationName, operationType, project, zone string) error
}

type autoscalingGceClientV1 struct {
Expand All @@ -123,39 +127,36 @@ type autoscalingGceClientV1 struct {
projectId string
domainUrl string

// These can be overridden, e.g. for testing.
operationWaitTimeout time.Duration
operationPollInterval time.Duration
operationDeletionPollInterval time.Duration
// Can be overridden, e.g. for testing.
operationWaitTimeout time.Duration
operationPollInterval time.Duration
}

// NewAutoscalingGceClientV1WithTimeout creates a new client with custom timeouts
// for communicating with GCE v1 API
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string,
waitTimeout, pollInterval, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string, waitTimeout time.Duration, pollInterval time.Duration) (*autoscalingGceClientV1, error) {
gceService, err := gce.New(client)
if err != nil {
return nil, err
}
gceService.UserAgent = userAgent

return &autoscalingGceClientV1{
projectId: projectId,
gceService: gceService,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
operationDeletionPollInterval: deletionPollInterval,
projectId: projectId,
gceService: gceService,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
}, nil
}

// NewAutoscalingGceClientV1 creates a new client for communicating with GCE v1 API.
func NewAutoscalingGceClientV1(client *http.Client, projectId string, userAgent string) (*autoscalingGceClientV1, error) {
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval, defaultOperationDeletionPollInterval)
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval)
}

// NewCustomAutoscalingGceClientV1 creates a new client using custom server url and timeouts
// for communicating with GCE v1 API.
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string,
waitTimeout, pollInterval time.Duration, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string, waitTimeout time.Duration, pollInterval time.Duration) (*autoscalingGceClientV1, error) {
gceService, err := gce.New(client)
if err != nil {
return nil, err
Expand All @@ -164,12 +165,11 @@ func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl,
gceService.UserAgent = userAgent

return &autoscalingGceClientV1{
projectId: projectId,
gceService: gceService,
domainUrl: domainUrl,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
operationDeletionPollInterval: deletionPollInterval,
projectId: projectId,
gceService: gceService,
domainUrl: domainUrl,
operationWaitTimeout: waitTimeout,
operationPollInterval: pollInterval,
}, nil
}

Expand Down Expand Up @@ -240,7 +240,7 @@ func (client *autoscalingGceClientV1) ResizeMig(migRef GceRef, size int64) error
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) error {
Expand All @@ -257,7 +257,7 @@ func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName st
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
Expand All @@ -274,32 +274,37 @@ func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
return instanceNames
}

func (client *autoscalingGceClientV1) waitForOp(operation *gce.Operation, project, zone string, isDeletion bool) error {
pollInterval := client.operationPollInterval
if isDeletion {
pollInterval = client.operationDeletionPollInterval
}
for start := time.Now(); time.Since(start) < client.operationWaitTimeout; time.Sleep(pollInterval) {
klog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name)
registerRequest("zone_operations", "get")
if op, err := client.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil {
klog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status)
if op.Status == "DONE" {
if op.Error != nil {
errBytes, err := op.Error.MarshalJSON()
if err != nil {
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
}
return fmt.Errorf("error while getting operation %s on %s: %s", operation.Name, operation.TargetLink, errBytes)
}
// WaitForOperation can be used to poll GCE operations until completion/timeout using WAIT calls.
// Calling this is normally not needed when interacting with the client, other methods should call it internally.
// Can be used to extend the interface with more methods outside of this package.
func (client *autoscalingGceClientV1) WaitForOperation(operationName, operationType, project, zone string) error {
ctx, cancel := context.WithTimeout(context.Background(), client.operationWaitTimeout)
defer cancel()

for {
mtrqq marked this conversation as resolved.
Show resolved Hide resolved
klog.V(4).Infof("Waiting for operation %s/%s (%s/%s)", operationType, operationName, project, zone)
registerRequest("zone_operations", "wait")
op, err := client.gceService.ZoneOperations.Wait(project, zone, operationName).Context(ctx).Do()
if err != nil {
return fmt.Errorf("error while waiting for operation %s/%s: %w", operationType, operationName, err)
}

return nil
klog.V(4).Infof("Operation %s/%s (%s/%s) status: %s", operationType, operationName, project, zone, op.Status)
if op.Status == "DONE" {
if op.Error != nil {
errBytes, err := op.Error.MarshalJSON()
if err != nil {
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
}
return fmt.Errorf("error while waiting for operation %s/%s: %s", operationType, operationName, errBytes)
}
} else {
klog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err)

return nil
}

// NOTE: sleep in order not to overload server, as potentially response may be returned immediately
time.Sleep(client.operationPollInterval)
}
return fmt.Errorf("timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
}

func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances []GceRef) error {
Expand All @@ -315,7 +320,7 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [
if err != nil {
return err
}
return client.waitForOp(op, migRef.Project, migRef.Zone, true)
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]GceInstance, error) {
Expand Down
58 changes: 32 additions & 26 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package gce

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -92,15 +93,13 @@ func TestWaitForOp(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

// default polling interval is too big for testing purposes
g.operationPollInterval = 1 * time.Millisecond
g.operationWaitTimeout = 500 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(3)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Times(3)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponse).Once()

operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOp", projectId, zoneB)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
}
Expand All @@ -110,32 +109,41 @@ func TestWaitForOpError(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponseError).Once()

operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponseError).Once()

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpError", projectId, zoneB)
assert.Error(t, err)
mtrqq marked this conversation as resolved.
Show resolved Hide resolved
mock.AssertExpectationsForObjects(t, server)
}

func TestWaitForOpTimeout(t *testing.T) {
server := test_util.NewHttpServerMock()
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

// The values here are higher than in other tests since we're aiming for timeout.
// Lower values make this fragile and flakey.
g.operationPollInterval = 10 * time.Millisecond
g.operationWaitTimeout = 49 * time.Millisecond
// default polling interval and wait time are too big for the test
g.operationWaitTimeout = 10 * time.Millisecond
g.operationPollInterval = 20 * time.Millisecond

// Sometimes, only 3 calls are made, but it doesn't really matter,
// so let's not assert expectations for this mock, just check for timeout error.
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(5)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Once()

operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}

err := g.waitForOp(operation, projectId, zoneB, false)
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpTimeout", projectId, zoneB)
assert.Error(t, err)
mock.AssertExpectationsForObjects(t, server)
}

func TestWaitForOpContextTimeout(t *testing.T) {
server := test_util.NewHttpServerMock()
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

g.operationWaitTimeout = 10 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").After(time.Minute).Return(operationDoneResponse).Once()

err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpContextTimeout", projectId, zoneB)
assert.ErrorIs(t, err, context.DeadlineExceeded)
mock.AssertExpectationsForObjects(t, server)
}

func TestErrors(t *testing.T) {
Expand Down Expand Up @@ -553,12 +561,10 @@ func TestUserAgent(t *testing.T) {
defer server.Close()
g := newTestAutoscalingGceClient(t, "project1", server.URL, "testuseragent")

g.operationPollInterval = 10 * time.Millisecond
g.operationWaitTimeout = 49 * time.Millisecond

server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return("testuseragent", operationRunningResponse).Maybe()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return("testuseragent", operationDoneResponse).Maybe()

operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestUserAgent", projectId, zoneB)

g.waitForOp(operation, projectId, zoneB, false)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
}
7 changes: 3 additions & 4 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa

// Override wait for op timeouts.
gceService.operationWaitTimeout = 50 * time.Millisecond
gceService.operationPollInterval = 1 * time.Millisecond

cache := &GceCache{
migs: make(map[GceRef]Mig),
Expand Down Expand Up @@ -473,7 +472,7 @@ func TestDeleteInstances(t *testing.T) {
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()

server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/deleteInstances").Return(deleteInstancesResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa").Return(deleteInstancesOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa/wait").Return(deleteInstancesOperationResponse).Once()

instances := []GceRef{
{
Expand Down Expand Up @@ -583,7 +582,7 @@ func TestGetAndSetMigSize(t *testing.T) {

// set target size for extraPoolMig; will require resize API call and API call for polling for resize operation
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%s/resize", extraPoolMigName)).Return(setMigSizeResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931").Return(setMigSizeOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931/wait").Return(setMigSizeOperationResponse).Once()
err = g.SetMigSize(extraPoolMig, 4)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
Expand Down Expand Up @@ -1509,7 +1508,7 @@ func TestAppendInstances(t *testing.T) {
defaultPoolMig := setupTestDefaultPool(g, true)
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%v/createInstances", defaultPoolMig.gceRef.Name)).Return(createInstancesResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32").Return(createInstancesOperationResponse).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32/wait").Return(createInstancesOperationResponse).Once()
err := g.CreateInstances(defaultPoolMig, 2)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ in
return nil
}

func (client *mockAutoscalingGceClient) WaitForOperation(_, _, _, _ string) error {
return nil
}

func TestFillMigInstances(t *testing.T) {
migRef := GceRef{Project: "test", Zone: "zone-A", Name: "some-mig"}
oldInstances := []GceInstance{
Expand Down
Loading