diff --git a/cloud/deployment/fromfile/fromfile.go b/cloud/deployment/fromfile/fromfile.go index 388ae912f..d5e7744f5 100644 --- a/cloud/deployment/fromfile/fromfile.go +++ b/cloud/deployment/fromfile/fromfile.go @@ -57,6 +57,7 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as existingDeployments []astro.Deployment nodePools []astrocore.NodePool jsonOutput bool + dagDeploy bool ) // get file contents as []byte @@ -118,6 +119,16 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as if err != nil { return err } + // get correct value for dag deploy + if formattedDeployment.Deployment.Configuration.DagDeployEnabled == nil { + if organization.IsOrgHosted() { + dagDeploy = true + } else { + dagDeploy = false + } + } else { + dagDeploy = *formattedDeployment.Deployment.Configuration.DagDeployEnabled + } // check if deployment exists if deploymentExists(existingDeployments, formattedDeployment.Deployment.Configuration.Name) { // create does not allow updating existing deployments @@ -127,7 +138,7 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as } // this deployment does not exist so create it // transform formattedDeployment to DeploymentCreateInput - createInput, _, err = getCreateOrUpdateInput(&formattedDeployment, clusterID, workspaceID, createAction, &astro.Deployment{}, nodePools, client) + createInput, _, err = getCreateOrUpdateInput(&formattedDeployment, clusterID, workspaceID, createAction, &astro.Deployment{}, nodePools, dagDeploy, client) if err != nil { return err } @@ -148,8 +159,15 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as existingDeployment = deploymentFromName(existingDeployments, formattedDeployment.Deployment.Configuration.Name) workspaceID = existingDeployment.Workspace.ID + // determine dagDeploy + if formattedDeployment.Deployment.Configuration.DagDeployEnabled == nil { + dagDeploy = existingDeployment.DagDeployEnabled + } else { + dagDeploy = *formattedDeployment.Deployment.Configuration.DagDeployEnabled + } + // transform formattedDeployment to DeploymentUpdateInput - _, updateInput, err = getCreateOrUpdateInput(&formattedDeployment, clusterID, workspaceID, updateAction, &existingDeployment, nodePools, client) + _, updateInput, err = getCreateOrUpdateInput(&formattedDeployment, clusterID, workspaceID, updateAction, &existingDeployment, nodePools, dagDeploy, client) if err != nil { return err } @@ -200,7 +218,7 @@ func CreateOrUpdate(inputFile, action string, client astro.Client, coreClient as // It returns an error if getting default options fail. // It returns an error if worker-queue options are not valid. // It returns an error if node pool id could not be found for the worker type. -func getCreateOrUpdateInput(deploymentFromFile *inspect.FormattedDeployment, clusterID, workspaceID, action string, existingDeployment *astro.Deployment, nodePools []astrocore.NodePool, client astro.Client) (astro.CreateDeploymentInput, astro.UpdateDeploymentInput, error) { //nolint +func getCreateOrUpdateInput(deploymentFromFile *inspect.FormattedDeployment, clusterID, workspaceID, action string, existingDeployment *astro.Deployment, nodePools []astrocore.NodePool, dagDeploy bool, client astro.Client) (astro.CreateDeploymentInput, astro.UpdateDeploymentInput, error) { //nolint var ( defaultOptions astro.WorkerQueueDefaultOptions configOptions astro.DeploymentConfig @@ -279,7 +297,7 @@ func getCreateOrUpdateInput(deploymentFromFile *inspect.FormattedDeployment, clu Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, RuntimeReleaseVersion: deploymentFromFile.Deployment.Configuration.RunTimeVersion, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: dagDeploy, SchedulerSize: deploymentFromFile.Deployment.Configuration.SchedulerSize, APIKeyOnlyDeployments: deploymentFromFile.Deployment.Configuration.APIKeyOnlyDeployments, IsHighAvailability: deploymentFromFile.Deployment.Configuration.IsHighAvailability, @@ -303,7 +321,7 @@ func getCreateOrUpdateInput(deploymentFromFile *inspect.FormattedDeployment, clu ClusterID: clusterID, Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: dagDeploy, SchedulerSize: deploymentFromFile.Deployment.Configuration.SchedulerSize, APIKeyOnlyDeployments: deploymentFromFile.Deployment.Configuration.APIKeyOnlyDeployments, IsHighAvailability: deploymentFromFile.Deployment.Configuration.IsHighAvailability, diff --git a/cloud/deployment/fromfile/fromfile_test.go b/cloud/deployment/fromfile/fromfile_test.go index 367552b98..e0c682c25 100644 --- a/cloud/deployment/fromfile/fromfile_test.go +++ b/cloud/deployment/fromfile/fromfile_test.go @@ -882,7 +882,6 @@ deployment: name: test-deployment-label description: description runtime_version: 6.0.0 - dag_deploy_enabled: true executor: CeleryExecutor scheduler_au: 5 scheduler_count: 3 @@ -1859,7 +1858,6 @@ deployment: name: test-deployment-label description: description 1 runtime_version: 6.0.0 - dag_deploy_enabled: true executor: CeleryExecutor scheduler_au: 5 scheduler_count: 3 @@ -2562,6 +2560,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true minCount := 3 qList = []inspect.Workerq{ { @@ -2612,7 +2611,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "worker_type: test-worker-8 does not exist in cluster: test-cluster") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2628,6 +2627,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true minCountThirty := 30 minCountThree := 3 qList = []inspect.Workerq{ @@ -2680,7 +2680,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) mockClient.On("GetWorkerQueueOptions").Return(mockWorkerQueueDefaultOptions, nil).Once() - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "worker queue option is invalid: min worker count") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2695,6 +2695,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true minCountThirty := 30 minCountThree := 3 qList = []inspect.Workerq{ @@ -2729,7 +2730,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) mockClient.On("GetWorkerQueueOptions").Return(astro.WorkerQueueDefaultOptions{}, errTest).Once() - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "failed to get worker queue default options") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2743,7 +2744,10 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.RunTimeVersion = "test-runtime-v" deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + minCount := -1 qList = []inspect.Workerq{ { @@ -2812,7 +2816,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, RuntimeReleaseVersion: deploymentFromFile.Deployment.Configuration.RunTimeVersion, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deployment.CeleryExecutor, Scheduler: astro.Scheduler{ @@ -2824,7 +2828,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } mockClient := new(astro_mocks.Client) mockClient.On("GetWorkerQueueOptions").Return(mockWorkerQueueDefaultOptions, nil).Once() - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2841,6 +2845,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true qList = []inspect.Workerq{ { Name: "default", @@ -2866,7 +2871,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "KubernetesExecutor does not support more than one worker queue. (2) were requested") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2881,6 +2886,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true minCount := 10 qList = []inspect.Workerq{ { @@ -2904,7 +2910,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "KubernetesExecutor does not support minimum worker count in the request. It can only be used with CeleryExecutor") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2919,6 +2925,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true minCount := -1 qList = []inspect.Workerq{ { @@ -2943,7 +2950,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "KubernetesExecutor does not support maximum worker count in the request. It can only be used with CeleryExecutor") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2958,6 +2965,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true minCount := -1 qList = []inspect.Workerq{ { @@ -2982,7 +2990,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "KubernetesExecutor does not support worker concurrency in the request. It can only be used with CeleryExecutor") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -2997,6 +3005,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true qList = []inspect.Workerq{ { Name: "default", @@ -3019,7 +3028,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } expectedDeploymentInput = astro.CreateDeploymentInput{} mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.ErrorContains(t, err, "KubernetesExecutor does not support pod ram in the request. It will be calculated based on the requested worker type") assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -3037,6 +3046,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy expectedDeploymentInput = astro.CreateDeploymentInput{ WorkspaceID: workspaceID, @@ -3044,7 +3055,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, RuntimeReleaseVersion: deploymentFromFile.Deployment.Configuration.RunTimeVersion, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deployment.CeleryExecutor, Scheduler: astro.Scheduler{ @@ -3055,7 +3066,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { WorkerQueues: nil, } mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, nil, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, nil, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -3070,6 +3081,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy minCount := -1 qList = []inspect.Workerq{ { @@ -3105,7 +3118,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, RuntimeReleaseVersion: deploymentFromFile.Deployment.Configuration.RunTimeVersion, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deployment.KubeExecutor, Scheduler: astro.Scheduler{ @@ -3116,7 +3129,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { WorkerQueues: expectedQList, } mockClient := new(astro_mocks.Client) - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -3131,6 +3144,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy minCount := 3 qList = []inspect.Workerq{ { @@ -3203,7 +3218,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, RuntimeReleaseVersion: deploymentFromFile.Deployment.Configuration.RunTimeVersion, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deployment.CeleryExecutor, Scheduler: astro.Scheduler{ @@ -3215,7 +3230,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } mockClient := new(astro_mocks.Client) mockClient.On("GetWorkerQueueOptions").Return(mockWorkerQueueDefaultOptions, nil).Once() - actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, mockClient) + actualCreateInput, _, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "create", &astro.Deployment{}, existingPools, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedDeploymentInput, actualCreateInput) mockClient.AssertExpectations(t) @@ -3233,6 +3248,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy existingDeployment := astro.Deployment{ ID: deploymentID, Label: "test-deployment", @@ -3246,7 +3263,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { ClusterID: clusterID, Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: "CeleryExecutor", Scheduler: astro.Scheduler{ @@ -3257,7 +3274,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { WorkerQueues: nil, } mockClient := new(astro_mocks.Client) - _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, nil, mockClient) + _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, nil, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedUpdateDeploymentInput, actualUpdateInput) mockClient.AssertExpectations(t) @@ -3273,6 +3290,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true existingDeployment := astro.Deployment{ ID: deploymentID, Label: "test-deployment", @@ -3283,7 +3301,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { expectedUpdateDeploymentInput = astro.UpdateDeploymentInput{} mockClient := new(astro_mocks.Client) - _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, "diff-cluster", workspaceID, "update", &existingDeployment, nil, mockClient) + _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, "diff-cluster", workspaceID, "update", &existingDeployment, nil, dagDeploy, mockClient) assert.ErrorIs(t, err, errNotPermitted) assert.ErrorContains(t, err, "changing an existing deployment's cluster is not permitted") assert.Equal(t, expectedUpdateDeploymentInput, actualUpdateInput) @@ -3300,6 +3318,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy existingPools := []astro.NodePool{ { @@ -3331,7 +3351,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { ClusterID: clusterID, Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deploymentFromFile.Deployment.Configuration.Executor, Scheduler: astro.Scheduler{ @@ -3342,7 +3362,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { WorkerQueues: nil, // a default queue is created by the api } mockClient := new(astro_mocks.Client) - _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, nil, mockClient) + _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, nil, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedUpdateDeploymentInput, actualUpdateInput) mockClient.AssertExpectations(t) @@ -3358,6 +3378,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy minCount := -1 qList = []inspect.Workerq{ { @@ -3418,7 +3440,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { ClusterID: clusterID, Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: deploymentFromFile.Deployment.Configuration.Executor, Scheduler: astro.Scheduler{ @@ -3429,7 +3451,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { WorkerQueues: expectedQList, } mockClient := new(astro_mocks.Client) - _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, existingPools, mockClient) + _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, existingPools, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedUpdateDeploymentInput, actualUpdateInput) mockClient.AssertExpectations(t) @@ -3445,6 +3467,8 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.CeleryExecutor + dagDeploy := true + deploymentFromFile.Deployment.Configuration.DagDeployEnabled = &dagDeploy minCount := 3 qList = []inspect.Workerq{ { @@ -3524,7 +3548,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { ClusterID: clusterID, Label: deploymentFromFile.Deployment.Configuration.Name, Description: deploymentFromFile.Deployment.Configuration.Description, - DagDeployEnabled: deploymentFromFile.Deployment.Configuration.DagDeployEnabled, + DagDeployEnabled: *deploymentFromFile.Deployment.Configuration.DagDeployEnabled, DeploymentSpec: astro.DeploymentCreateSpec{ Executor: "CeleryExecutor", Scheduler: astro.Scheduler{ @@ -3536,7 +3560,7 @@ func TestGetCreateOrUpdateInput(t *testing.T) { } mockClient := new(astro_mocks.Client) mockClient.On("GetWorkerQueueOptions").Return(mockWorkerQueueDefaultOptions, nil).Once() - _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, existingPools, mockClient) + _, actualUpdateInput, err = getCreateOrUpdateInput(&deploymentFromFile, clusterID, workspaceID, "update", &existingDeployment, existingPools, dagDeploy, mockClient) assert.NoError(t, err) assert.Equal(t, expectedUpdateDeploymentInput, actualUpdateInput) mockClient.AssertExpectations(t) diff --git a/cloud/deployment/inspect/inspect.go b/cloud/deployment/inspect/inspect.go index f86d5391b..c20fccbc7 100644 --- a/cloud/deployment/inspect/inspect.go +++ b/cloud/deployment/inspect/inspect.go @@ -37,7 +37,7 @@ type deploymentConfig struct { Name string `mapstructure:"name" yaml:"name" json:"name"` Description string `mapstructure:"description" yaml:"description" json:"description"` RunTimeVersion string `mapstructure:"runtime_version" yaml:"runtime_version" json:"runtime_version"` - DagDeployEnabled bool `mapstructure:"dag_deploy_enabled" yaml:"dag_deploy_enabled" json:"dag_deploy_enabled"` + DagDeployEnabled *bool `mapstructure:"dag_deploy_enabled" yaml:"dag_deploy_enabled" json:"dag_deploy_enabled"` APIKeyOnlyDeployments bool `mapstructure:"ci_cd_enforcement" yaml:"ci_cd_enforcement" json:"ci_cd_enforcement"` SchedulerSize string `mapstructure:"scheduler_size" yaml:"scheduler_size" json:"scheduler_size"` IsHighAvailability bool `mapstructure:"is_high_availability" yaml:"is_high_availability" json:"is_high_availability"` diff --git a/cloud/deployment/inspect/inspect_test.go b/cloud/deployment/inspect/inspect_test.go index 55a1e80e1..e676e3928 100644 --- a/cloud/deployment/inspect/inspect_test.go +++ b/cloud/deployment/inspect/inspect_test.go @@ -562,7 +562,7 @@ func TestGetDeploymentConfig(t *testing.T) { RunTimeVersion: sourceDeployment.RuntimeRelease.Version, SchedulerAU: sourceDeployment.DeploymentSpec.Scheduler.AU, SchedulerCount: sourceDeployment.DeploymentSpec.Scheduler.Replicas, - DagDeployEnabled: sourceDeployment.DagDeployEnabled, + DagDeployEnabled: &sourceDeployment.DagDeployEnabled, Executor: sourceDeployment.DeploymentSpec.Executor, } rawDeploymentConfig := getDeploymentConfig(&sourceDeployment) diff --git a/cmd/cloud/deployment.go b/cmd/cloud/deployment.go index 2f887c31e..dc528c4f3 100644 --- a/cmd/cloud/deployment.go +++ b/cmd/cloud/deployment.go @@ -147,7 +147,7 @@ func newDeploymentCreateCmd(out io.Writer) *cobra.Command { cmd.Flags().StringVarP(&workspaceID, "workspace-id", "w", "", "Workspace to create the Deployment in") cmd.Flags().StringVarP(&description, "description", "d", "", "Description of the Deployment. If the description contains a space, specify the entire description in quotes \"\"") cmd.Flags().StringVarP(&runtimeVersion, "runtime-version", "v", "", "Runtime version for the Deployment") - cmd.Flags().StringVarP(&dagDeploy, "dag-deploy", "", "disable", "Enables DAG-only deploys for the Deployment") + cmd.Flags().StringVarP(&dagDeploy, "dag-deploy", "", "", "Enables DAG-only deploys for the Deployment") cmd.Flags().StringVarP(&executor, "executor", "e", "", "The executor to use for the Deployment. Possible values can be CeleryExecutor or KubernetesExecutor.") cmd.Flags().StringVarP(&inputFile, "deployment-file", "", "", "Location of file containing the Deployment to create. File can be in either JSON or YAML format.") cmd.Flags().BoolVarP(&waitForStatus, "wait", "i", false, "Wait for the Deployment to become healthy before ending the command") @@ -324,7 +324,7 @@ func deploymentLogs(cmd *cobra.Command, args []string) error { return deployment.Logs(deploymentID, ws, deploymentName, warnLogs, errorLogs, infoLogs, logCount, astroClient) } -func deploymentCreate(cmd *cobra.Command, _ []string, out io.Writer) error { +func deploymentCreate(cmd *cobra.Command, _ []string, out io.Writer) error { //nolint:gocognit // Find Workspace ID ws, err := coalesceWorkspace() if err != nil { @@ -365,6 +365,13 @@ func deploymentCreate(cmd *cobra.Command, _ []string, out io.Writer) error { if dagDeploy != "" && !(dagDeploy == enable || dagDeploy == disable) { return errors.New("Invalid --dag-deploy value)") } + if dagDeploy == "" { + if organization.IsOrgHosted() { + dagDeploy = enable + } else { + dagDeploy = disable + } + } // Get latest runtime version if runtimeVersion == "" {