Skip to content

Commit

Permalink
Adding wait flag for astro deploy to be healthy (#1293)
Browse files Browse the repository at this point in the history
* Adding wait flag for astro deploy to be healthy

* re-use function

* Add wait for status for dag only deploy

* Fixing lint

* Adding tests to bump coverage

* Fixing lint
  • Loading branch information
kushalmalani authored Jul 10, 2023
1 parent b76e610 commit f4e6650
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 11 deletions.
30 changes: 30 additions & 0 deletions cloud/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ var (
envFileMissing = errors.New("Env file path is incorrect: ") //nolint:revive
)

var (
sleepTime = 90
dagOnlyDeploySleepTime = 30
tickNum = 10
timeoutNum = 180
)

type deploymentInfo struct {
deploymentID string
namespace string
Expand All @@ -90,6 +97,7 @@ type InputDeploy struct {
DeploymentName string
Prompt bool
Dags bool
WaitForStatus bool
DagsPath string
}

Expand Down Expand Up @@ -267,6 +275,21 @@ func Deploy(deployInput InputDeploy, client astro.Client, coreClient astrocore.C
return err
}

if deployInput.WaitForStatus {
// Keeping wait timeout low since dag only deploy is faster
err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, dagOnlyDeploySleepTime, tickNum, timeoutNum, coreClient)
if err != nil {
return err
}

fmt.Println("\nSuccessfully uploaded DAGs with version " + ansi.Bold(versionID) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful." +
"\n\n Access your Deployment: \n" +
fmt.Sprintf("\n Deployment View: %s", ansi.Bold(deploymentURL)) +
fmt.Sprintf("\n Airflow UI: %s", ansi.Bold(deployInfo.webserverURL)))

return nil
}

fmt.Println("\nSuccessfully uploaded DAGs with version " + ansi.Bold(versionID) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful. The Airflow UI takes about 1 minute to update." +
"\n\n Access your Deployment: \n" +
fmt.Sprintf("\n Deployment View: %s", ansi.Bold(deploymentURL)) +
Expand Down Expand Up @@ -343,6 +366,13 @@ func Deploy(deployInput InputDeploy, client astro.Client, coreClient astrocore.C
}
}

if deployInput.WaitForStatus {
err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, sleepTime, tickNum, timeoutNum, coreClient)
if err != nil {
return err
}
}

fmt.Println("Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful." +
"\n\n Access your Deployment: \n" +
fmt.Sprintf("\n Deployment View: %s", ansi.Bold("https://"+deploymentURL)) +
Expand Down
50 changes: 42 additions & 8 deletions cloud/deploy/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ var (
initiatedDagDeploymentID = "test-dag-deployment-id"
runtimeID = "test-id"
dagURL = "http://fake-url.windows.core.net"
mockCoreClient = new(astrocore_mocks.ClientWithResponsesInterface)
)

func TestDeployWithoutDagsDeploySuccess(t *testing.T) {
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)
mockDeplyResp := astro.Deployment{
ID: "test-id",
ReleaseName: "test-name",
Expand All @@ -53,17 +53,18 @@ func TestDeployWithoutDagsDeploySuccess(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: false,
}
testUtil.InitTestConfig(testUtil.CloudPlatform)
config.CFG.ShowWarnings.SetHomeString("false")
mockClient := new(astro_mocks.Client)

mockClient.On("GetDeployment", mock.Anything).Return(mockDeplyResp, nil).Times(3)
mockClient.On("GetDeployment", mock.Anything).Return(mockDeplyResp, nil).Times(4)
mockClient.On("ListDeployments", org, ws).Return([]astro.Deployment{{ID: "test-id", Workspace: astro.Workspace{ID: ws}}}, nil).Once()
mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{RuntimeReleases: []astro.RuntimeRelease{{Version: "4.2.5"}}}, nil).Times(4)
mockClient.On("CreateImage", mock.Anything).Return(&astro.Image{}, nil).Times(4)
mockClient.On("DeployImage", mock.Anything).Return(&astro.Image{}, nil).Times(4)
mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{RuntimeReleases: []astro.RuntimeRelease{{Version: "4.2.5"}}}, nil).Times(5)
mockClient.On("CreateImage", mock.Anything).Return(&astro.Image{}, nil).Times(5)
mockClient.On("DeployImage", mock.Anything).Return(&astro.Image{}, nil).Times(5)

mockImageHandler := new(mocks.ImageHandler)
airflowImageHandler = func(image string) airflow.ImageHandler {
Expand Down Expand Up @@ -127,7 +128,16 @@ func TestDeployWithoutDagsDeploySuccess(t *testing.T) {
err = Deploy(deployInput, mockClient, mockCoreClient)
assert.NoError(t, err)

defer testUtil.MockUserInput(t, "y")()
deployInput.Pytest = ""
deployInput.WaitForStatus = true
sleepTime = 1
timeoutNum = 1
err = Deploy(deployInput, mockClient, mockCoreClient)
assert.ErrorContains(t, err, "timed out waiting for the deployment to become healthy")

mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
mockImageHandler.AssertExpectations(t)
mockContainerHandler.AssertExpectations(t)
}
Expand All @@ -136,6 +146,7 @@ func TestDeployWithDagsDeploySuccess(t *testing.T) {
os.Mkdir("./testfiles/dags", os.ModePerm)
path := "./testfiles/dags/test.py"
fileutil.WriteStringToFile(path, "testing")
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)

mockDeplyResp := astro.Deployment{
ID: "test-id",
Expand All @@ -157,6 +168,7 @@ func TestDeployWithDagsDeploySuccess(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: false,
}
testUtil.InitTestConfig(testUtil.CloudPlatform)
Expand Down Expand Up @@ -270,6 +282,7 @@ func TestDeployWithDagsDeploySuccess(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: false,
}
defer testUtil.MockUserInput(t, "y")()
Expand All @@ -280,11 +293,13 @@ func TestDeployWithDagsDeploySuccess(t *testing.T) {
defer os.RemoveAll("./testfiles/dags/")

mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
mockImageHandler.AssertExpectations(t)
mockContainerHandler.AssertExpectations(t)
}

func TestDagsDeploySuccess(t *testing.T) {
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)
mockDeplyResp := []astro.Deployment{
{
ID: "test-id",
Expand Down Expand Up @@ -320,15 +335,16 @@ func TestDagsDeploySuccess(t *testing.T) {
DeploymentName: "",
Prompt: true,
Dags: true,
WaitForStatus: false,
DagsPath: "./testfiles/dags",
}
testUtil.InitTestConfig(testUtil.LocalPlatform)
config.CFG.ShowWarnings.SetHomeString("false")
mockClient := new(astro_mocks.Client)

mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{RuntimeReleases: []astro.RuntimeRelease{{Version: "4.2.5"}}}, nil).Times(3)
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(mockDeplyResp, nil).Times(4)
mockClient.On("InitiateDagDeployment", astro.InitiateDagDeploymentInput{RuntimeID: runtimeID}).Return(astro.InitiateDagDeployment{ID: initiatedDagDeploymentID, DagURL: dagURL}, nil).Times(4)
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(mockDeplyResp, nil).Times(5)
mockClient.On("InitiateDagDeployment", astro.InitiateDagDeploymentInput{RuntimeID: runtimeID}).Return(astro.InitiateDagDeployment{ID: initiatedDagDeploymentID, DagURL: dagURL}, nil).Times(5)

azureUploader = func(sasLink string, file io.Reader) (string, error) {
return "version-id", nil
Expand All @@ -342,7 +358,7 @@ func TestDagsDeploySuccess(t *testing.T) {
Status: "SUCCEEDED",
Message: "DAGs uploaded successfully",
}
mockClient.On("ReportDagDeploymentStatus", reportDagDeploymentStatusInput).Return(astro.DagDeploymentStatus{}, nil).Times(4)
mockClient.On("ReportDagDeploymentStatus", reportDagDeploymentStatusInput).Return(astro.DagDeploymentStatus{}, nil).Times(5)

defer testUtil.MockUserInput(t, "y")()
err := Deploy(deployInput, mockClient, mockCoreClient)
Expand Down Expand Up @@ -380,15 +396,25 @@ func TestDagsDeploySuccess(t *testing.T) {
err = Deploy(deployInput, mockClient, mockCoreClient)
assert.NoError(t, err)

defer testUtil.MockUserInput(t, "y")()
deployInput.Pytest = ""
deployInput.WaitForStatus = true
dagOnlyDeploySleepTime = 1
timeoutNum = 1
err = Deploy(deployInput, mockClient, mockCoreClient)
assert.ErrorContains(t, err, "timed out waiting for the deployment to become healthy")

defer os.RemoveAll("./testfiles/dags/")

mockCoreClient.AssertExpectations(t)
mockClient.AssertExpectations(t)
}

func TestNoDagsDeploy(t *testing.T) {
testUtil.InitTestConfig(testUtil.LocalPlatform)
config.CFG.ShowWarnings.SetHomeString("true")
mockClient := new(astro_mocks.Client)
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)

ctx, err := config.GetCurrentContext()
assert.NoError(t, err)
Expand Down Expand Up @@ -432,18 +458,21 @@ func TestNoDagsDeploy(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: true,
}
err = Deploy(deployInput, mockClient, mockCoreClient)
assert.NoError(t, err)

mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
}

func TestDagsDeployFailed(t *testing.T) {
testUtil.InitTestConfig(testUtil.LocalPlatform)
config.CFG.ShowWarnings.SetHomeString("false")
mockClient := new(astro_mocks.Client)
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)

mockDeplyResp := []astro.Deployment{
{
Expand Down Expand Up @@ -479,6 +508,7 @@ func TestDagsDeployFailed(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: true,
}
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(mockDeplyResp, nil).Times(3)
Expand Down Expand Up @@ -513,6 +543,7 @@ func TestDagsDeployFailed(t *testing.T) {
assert.Error(t, err)

mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
}

func TestDeployFailure(t *testing.T) {
Expand All @@ -526,6 +557,7 @@ func TestDeployFailure(t *testing.T) {
testUtil.InitTestConfig(testUtil.CloudPlatform)
err := config.ResetCurrentContext()
assert.NoError(t, err)
mockCoreClient := new(astrocore_mocks.ClientWithResponsesInterface)

deployInput := InputDeploy{
Path: "./testfiles/",
Expand All @@ -536,6 +568,7 @@ func TestDeployFailure(t *testing.T) {
ImageName: "",
DeploymentName: "",
Prompt: true,
WaitForStatus: false,
Dags: false,
}

Expand Down Expand Up @@ -608,6 +641,7 @@ func TestDeployFailure(t *testing.T) {
assert.ErrorIs(t, err, envFileMissing)

mockClient.AssertExpectations(t)
mockCoreClient.AssertExpectations(t)
mockImageHandler.AssertExpectations(t)
mockContainerHandler.AssertExpectations(t)
}
Expand Down
6 changes: 3 additions & 3 deletions cloud/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func Create(label, workspaceID, description, clusterID, runtimeVersion, dagDeplo
}

if waitForStatus {
err = healthPoll(d.ID, workspaceID, coreClient)
err = HealthPoll(d.ID, workspaceID, sleepTime, tickNum, timeoutNum, coreClient)
if err != nil {
errOutput := createOutput(workspaceID, clusterType, &d)
if errOutput != nil {
Expand Down Expand Up @@ -547,8 +547,8 @@ func useSharedClusterOrSelectDedicatedCluster(cloudProvider, region, organizatio
return derivedClusterID, nil
}

func healthPoll(deploymentID, ws string, coreClient astrocore.CoreClient) error {
fmt.Printf("Waiting for the deployment to become healthy…\n\nThis may take a few minutes\n")
func HealthPoll(deploymentID, ws string, sleepTime, tickNum, timeoutNum int, coreClient astrocore.CoreClient) error {
fmt.Printf("\nWaiting for the deployment to become healthy…\n\nThis may take a few minutes\n")
time.Sleep(time.Duration(sleepTime) * time.Second)
buf := new(bytes.Buffer)
timeout := time.After(time.Duration(timeoutNum) * time.Second)
Expand Down
3 changes: 3 additions & 0 deletions cmd/cloud/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
pytest bool
parse bool
dags bool
waitForDeploy bool
dagsPath string
deployExample = `
Specify the ID of the Deployment on Astronomer you would like to deploy this project to:
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewDeployCmd() *cobra.Command {
cmd.Flags().StringVar(&dagsPath, "dags-path", "", "If set deploy dags from this path instead of the dags from working directory")
cmd.Flags().StringVarP(&deploymentName, "deployment-name", "n", "", "Name of the deployment to deploy to")
cmd.Flags().BoolVar(&parse, "parse", false, "Succeed only if all DAGs in your Astro project parse without errors")
cmd.Flags().BoolVarP(&waitForDeploy, "wait", "w", false, "Wait for the Deployment to become healthy before ending the command")
cmd.Flags().MarkHidden("dags-path") //nolint:errcheck
return cmd
}
Expand Down Expand Up @@ -136,6 +138,7 @@ func deploy(cmd *cobra.Command, args []string) error {
DeploymentName: deploymentName,
Prompt: forcePrompt,
Dags: dags,
WaitForStatus: waitForDeploy,
DagsPath: dagsPath,
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/cloud/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestDeployImage(t *testing.T) {
err := execDeployCmd([]string{"-f"}...)
assert.NoError(t, err)

err = execDeployCmd([]string{"test-deployment-id", "-f", "--wait"}...)
assert.NoError(t, err)

err = execDeployCmd([]string{"test-deployment-id", "--save"}...)
assert.NoError(t, err)

Expand All @@ -51,6 +54,9 @@ func TestDeployImage(t *testing.T) {
err = execDeployCmd([]string{"test-deployment-id", "--dags"}...)
assert.NoError(t, err)

err = execDeployCmd([]string{"test-deployment-id", "--dags", "--wait"}...)
assert.NoError(t, err)

err = execDeployCmd([]string{"-f", "test-deployment-id", "--dags", "--pytest"}...)
assert.NoError(t, err)

Expand Down

0 comments on commit f4e6650

Please sign in to comment.