Skip to content

Commit

Permalink
Fix for cluster is not permitted error (#1367)
Browse files Browse the repository at this point in the history
* Fix for cluster is not permitted error

* Fixing test
  • Loading branch information
kushalmalani authored Aug 25, 2023
1 parent 4fa1191 commit f142e1d
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 15 deletions.
35 changes: 20 additions & 15 deletions cloud/deployment/fromfile/fromfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,7 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as
return err
}

if deployment.IsDeploymentHosted(formattedDeployment.Deployment.Configuration.DeploymentType) {
getSharedClusterParams := astrocore.GetSharedClusterParams{
Region: formattedDeployment.Deployment.Configuration.Region,
CloudProvider: astrocore.GetSharedClusterParamsCloudProvider(formattedDeployment.Deployment.Configuration.CloudProvider),
}
response, err := coreClient.GetSharedClusterWithResponse(context.Background(), &getSharedClusterParams)
if err != nil {
return err
}
err = astrocore.NormalizeAPIError(response.HTTPResponse, response.Body)
if err != nil {
return err
}
clusterID = response.JSON200.Id
} else {
if !deployment.IsDeploymentHosted(formattedDeployment.Deployment.Configuration.DeploymentType) {
// map cluster name to id and collect node pools for cluster
clusterID, nodePools, err = getClusterInfoFromName(formattedDeployment.Deployment.Configuration.ClusterName, c.OrganizationShortName, coreClient)
if err != nil {
Expand All @@ -114,6 +100,21 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as
}
switch action {
case createAction:
if deployment.IsDeploymentHosted(formattedDeployment.Deployment.Configuration.DeploymentType) {
getSharedClusterParams := astrocore.GetSharedClusterParams{
Region: formattedDeployment.Deployment.Configuration.Region,
CloudProvider: astrocore.GetSharedClusterParamsCloudProvider(formattedDeployment.Deployment.Configuration.CloudProvider),
}
response, err := coreClient.GetSharedClusterWithResponse(context.Background(), &getSharedClusterParams)
if err != nil {
return err
}
err = astrocore.NormalizeAPIError(response.HTTPResponse, response.Body)
if err != nil {
return err
}
clusterID = response.JSON200.Id
}
// map workspace name to id
workspaceID, err = getWorkspaceIDFromName(formattedDeployment.Deployment.Configuration.WorkspaceName, c.Organization, coreClient)
if err != nil {
Expand Down Expand Up @@ -166,6 +167,10 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as
dagDeploy = *formattedDeployment.Deployment.Configuration.DagDeployEnabled
}

if deployment.IsDeploymentHosted(formattedDeployment.Deployment.Configuration.DeploymentType) {
clusterID = existingDeployment.Cluster.ID
}

// transform formattedDeployment to DeploymentUpdateInput
_, updateInput, err = getCreateOrUpdateInput(&formattedDeployment, clusterID, workspaceID, updateAction, &existingDeployment, nodePools, dagDeploy, client)
if err != nil {
Expand Down
169 changes: 169 additions & 0 deletions cloud/deployment/fromfile/fromfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,175 @@ deployment:
mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
})
t.Run("reads the yaml file and updates an existing hosted standard deployment", func(t *testing.T) {
testUtil.InitTestConfig(testUtil.CloudPlatform)
out := new(bytes.Buffer)
mockClient := new(astro_mocks.Client)
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)
filePath = "./deployment.yaml"
data = `
deployment:
environment_variables:
- is_secret: false
key: foo
updated_at: NOW
value: bar
- is_secret: true
key: bar
updated_at: NOW+1
value: baz
configuration:
name: test-deployment-label
description: description 1
runtime_version: 6.0.0
dag_deploy_enabled: true
executor: CeleryExecutor
scheduler_au: 5
scheduler_count: 3
cluster_name: us-east-1
workspace_name: test-workspace
deployment_type: HOSTED_SHARED
worker_queues:
- name: default
is_default: true
max_worker_count: 20
min_worker_count: 5
worker_concurrency: 10
worker_type: a10
- name: test-queue-1
is_default: false
max_worker_count: 20
min_worker_count: 8
worker_concurrency: 10
worker_type: a10
metadata:
deployment_id: test-deployment-id
workspace_id: test-ws-id
cluster_id: cluster-id
release_name: great-release-name
airflow_version: 2.4.0
status: HEALTHY
created_at: 2022-11-17T13:25:55.275697-08:00
updated_at: 2022-11-17T13:25:55.275697-08:00
deployment_url: cloud.astronomer.io/test-ws-id/deployments/test-deployment-id/analytics
webserver_url: some-url
alert_emails:
- test1@test.com
- test2@test.com
`
orgID = "test-org-id"
mockEnvVarResponse := []astro.EnvironmentVariablesObject{
{
IsSecret: false,
Key: "foo",
Value: "bar",
UpdatedAt: "NOW",
},
{
IsSecret: true,
Key: "bar",
Value: "baz",
UpdatedAt: "NOW+1",
},
}
mockWorkerQueueDefaultOptions = astro.WorkerQueueDefaultOptions{
MinWorkerCount: astro.WorkerQueueOption{
Floor: 1,
Ceiling: 20,
Default: 5,
},
MaxWorkerCount: astro.WorkerQueueOption{
Floor: 16,
Ceiling: 200,
Default: 125,
},
WorkerConcurrency: astro.WorkerQueueOption{
Floor: 175,
Ceiling: 275,
Default: 180,
},
}
emails = []string{"test1@test.com", "test2@test.com"}
mockAlertEmailResponse = astro.DeploymentAlerts{AlertEmails: emails}
existingDeployment := astro.Deployment{
ID: "test-deployment-id",
Label: "test-deployment-label",
Description: "description",
Cluster: astro.Cluster{
ID: "test-cluster-id",
Name: "test-cluster",
NodePools: []astro.NodePool{
{
ID: "test-pool-id",
IsDefault: false,
NodeInstanceType: "test-worker-1",
},
{
ID: "test-pool-id-2",
IsDefault: false,
NodeInstanceType: "test-worker-2",
},
},
},
}
updatedDeployment := astro.Deployment{
ID: "test-deployment-id",
Label: "test-deployment-label",
Description: "description 1",
}
fileutil.WriteStringToFile(filePath, data)
defer afero.NewOsFs().Remove(filePath)
mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{
AstroMachines: []astro.Machine{
{
Type: "a5",
ConcurrentTasks: 5,
ConcurrentTasksMax: 15,
},
{
Type: "a10",
ConcurrentTasks: 10,
ConcurrentTasksMax: 30,
},
{
Type: "a20",
ConcurrentTasks: 20,
ConcurrentTasksMax: 60,
},
},
Components: astro.Components{
Scheduler: astro.SchedulerConfig{
AU: astro.AuConfig{
Default: 5,
Limit: 24,
},
Replicas: astro.ReplicasConfig{
Default: 1,
Minimum: 1,
Limit: 4,
},
},
},
}, nil).Once()
mockClient.On("ListDeployments", orgID, "").Return([]astro.Deployment{existingDeployment}, nil).Once()
mockClient.On("GetWorkerQueueOptions").Return(mockWorkerQueueDefaultOptions, nil).Once()
mockClient.On("UpdateDeployment", mock.Anything).Return(updatedDeployment, nil)
mockClient.On("ModifyDeploymentVariable", mock.Anything).Return(mockEnvVarResponse, nil)
mockClient.On("UpdateAlertEmails", mock.Anything).Return(mockAlertEmailResponse, nil)
mockClient.On("ListDeployments", orgID, "").Return([]astro.Deployment{updatedDeployment}, nil)
depIds := []string{createdDeployment.ID}
deploymentListParams := &astrocore.ListDeploymentsParams{
DeploymentIds: &depIds,
}
mockCoreClient.On("ListDeploymentsWithResponse", mock.Anything, mock.Anything, deploymentListParams).Return(&mockListDeploymentsResponse, nil).Once()
err = CreateOrUpdate("deployment.yaml", "update", mockClient, mockCoreClient, out)
assert.NoError(t, err)
assert.Contains(t, out.String(), "configuration:\n name: "+existingDeployment.Label)
assert.Contains(t, out.String(), "\n description: "+updatedDeployment.Description)
assert.Contains(t, out.String(), "metadata:\n deployment_id: "+existingDeployment.ID)
mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
})
t.Run("return an error when enabling dag deploy for ci-cd enforced deployment", func(t *testing.T) {
testUtil.InitTestConfig(testUtil.CloudPlatform)
out := new(bytes.Buffer)
Expand Down

0 comments on commit f142e1d

Please sign in to comment.