From f776a7787b62fa26c6f7d29a2a9ffb4b44f59b4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Wed, 13 Apr 2022 13:43:17 +0200 Subject: [PATCH 1/5] Fixed minor log message formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- pkg/rpc/adminservice/launch_plan.go | 4 ++-- pkg/rpc/adminservice/node_execution.go | 2 +- pkg/rpc/adminservice/task.go | 2 +- pkg/rpc/adminservice/task_execution.go | 4 ++-- pkg/rpc/adminservice/workflow.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/rpc/adminservice/launch_plan.go b/pkg/rpc/adminservice/launch_plan.go index 6a7dae4f2..0ee96f1ff 100644 --- a/pkg/rpc/adminservice/launch_plan.go +++ b/pkg/rpc/adminservice/launch_plan.go @@ -38,7 +38,7 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.ResourceType = core.ResourceType_LAUNCH_PLAN } var response *admin.LaunchPlan @@ -80,7 +80,7 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.ResourceType = core.ResourceType_LAUNCH_PLAN } var response *admin.LaunchPlanUpdateResponse diff --git a/pkg/rpc/adminservice/node_execution.go b/pkg/rpc/adminservice/node_execution.go index c5479eda2..b4cabedf3 100644 --- a/pkg/rpc/adminservice/node_execution.go +++ b/pkg/rpc/adminservice/node_execution.go @@ -76,7 +76,7 @@ func (m *AdminService) ListNodeExecutionsForTask( // to the request. if request.TaskExecutionId != nil && request.TaskExecutionId.TaskId != nil && request.TaskExecutionId.TaskId.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.TaskExecutionId.TaskId.ResourceType = core.ResourceType_TASK } var response *admin.NodeExecutionList diff --git a/pkg/rpc/adminservice/task.go b/pkg/rpc/adminservice/task.go index d1d4089ee..8ab1463fc 100644 --- a/pkg/rpc/adminservice/task.go +++ b/pkg/rpc/adminservice/task.go @@ -39,7 +39,7 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.ResourceType = core.ResourceType_TASK } var response *admin.Task diff --git a/pkg/rpc/adminservice/task_execution.go b/pkg/rpc/adminservice/task_execution.go index a45b67106..b2b0d5f00 100644 --- a/pkg/rpc/adminservice/task_execution.go +++ b/pkg/rpc/adminservice/task_execution.go @@ -41,7 +41,7 @@ func (m *AdminService) GetTaskExecution( // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.TaskId != nil && request.Id.TaskId.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.TaskId.ResourceType = core.ResourceType_TASK } if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { @@ -91,7 +91,7 @@ func (m *AdminService) GetTaskExecutionData( // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.TaskId != nil && request.Id.TaskId.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.TaskId.ResourceType = core.ResourceType_TASK } var response *admin.TaskExecutionGetDataResponse diff --git a/pkg/rpc/adminservice/workflow.go b/pkg/rpc/adminservice/workflow.go index ae9c5ceb5..dad930ce9 100644 --- a/pkg/rpc/adminservice/workflow.go +++ b/pkg/rpc/adminservice/workflow.go @@ -40,7 +40,7 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet // NOTE: When the Get HTTP endpoint is called the resource type is implicit (from the URL) so we must add it // to the request. if request.Id != nil && request.Id.ResourceType == core.ResourceType_UNSPECIFIED { - logger.Info(ctx, "Adding resource type for unspecified value in request: [%+v]", request) + logger.Infof(ctx, "Adding resource type for unspecified value in request: [%+v]", request) request.Id.ResourceType = core.ResourceType_WORKFLOW } var response *admin.Workflow From 6ce610ee6174b3529e4a2397b2bc650a3557734f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Wed, 20 Apr 2022 16:20:23 +0200 Subject: [PATCH 2/5] Added override for interruptible flag to execution config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- go.mod | 10 +- go.sum | 19 +- pkg/manager/impl/execution_manager.go | 8 + pkg/manager/impl/execution_manager_test.go | 329 +++++++++++++++++- .../interfaces/application_configuration.go | 6 + pkg/workflowengine/impl/prepare_execution.go | 1 + .../impl/prepare_execution_test.go | 10 + 7 files changed, 370 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 1d6186bf4..f0f30759b 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,11 @@ module github.com/flyteorg/flyteadmin go 1.18 +replace ( + github.com/flyteorg/flyteidl => github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30 + github.com/flyteorg/flytepropeller => github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8 +) + require ( cloud.google.com/go/iam v0.1.0 cloud.google.com/go/storage v1.14.0 @@ -13,8 +18,8 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.24.17 - github.com/flyteorg/flyteplugins v0.10.16 + github.com/flyteorg/flyteidl v0.24.19 + github.com/flyteorg/flyteplugins v0.10.24 github.com/flyteorg/flytepropeller v0.16.36 github.com/flyteorg/flytestdlib v0.4.23 github.com/flyteorg/stow v0.3.3 @@ -176,6 +181,7 @@ require ( gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + k8s.io/apiextensions-apiserver v0.20.1 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index b1018df1d..150d15d1b 100644 --- a/go.sum +++ b/go.sum @@ -133,6 +133,10 @@ github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030I github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30 h1:NvHNJm+IcLBOb5Te3trSs7KJTqLJCMkcLgZ9WmxLQoA= +github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= +github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8 h1:LXT81FQ5DeKGeid8qDPGg3i05TWtAUoIWwlY23pWWwg= +github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8/go.mod h1:QBC3GoCUY0q4k69vXUBVnJZZ3XxkKHLj8KnmB5zsfF4= github.com/NYTimes/gizmo v1.3.6 h1:K+GRagPdAxojsT1TlTQlMkTeOmgfLxSdvuOhdki7GG0= github.com/NYTimes/gizmo v1.3.6/go.mod h1:8S8QVnITA40p/1jGsUMcPI8R9SSKkoKu+8WF13s9Uhw= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -367,16 +371,11 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.17 h1:Xx70bJbuQGyvS8uAyU4AN74rot6KnzJ9r/L9gcCdEsU= -github.com/flyteorg/flyteidl v0.24.17/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= -github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw= -github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos= -github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU= -github.com/flyteorg/flytepropeller v0.16.36/go.mod h1:DGCjQSRp8VYOBH56aQyAZfNf1Vgh+GNpwQL7uhottYM= +github.com/flyteorg/flyteplugins v0.10.24 h1:hyabJ3BywcCLPfUu/8WC3lKWbY72aoxcAx8uF7dWSBM= +github.com/flyteorg/flyteplugins v0.10.24/go.mod h1:12hTsHaGNKU9BVpTGcxtiL+Zrf5sfDXiDDsPvEO40CQ= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.4.13/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs= +github.com/flyteorg/flytestdlib v0.4.22/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v0.4.23 h1:REDpbqzhvuT4biIXPHkWVHpEqs3OMVninB9YadhIPK0= github.com/flyteorg/flytestdlib v0.4.23/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/stow v0.3.3 h1:tzeNl8mSZFL3oJDi0ACZj6FAineQAF4qyEp6bXtIdQY= @@ -1555,6 +1554,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.elastic.co/apm v1.8.0/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0= @@ -1706,6 +1706,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1777,6 +1778,7 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1924,6 +1926,7 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 0bf5887d9..d3ef50699 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -447,6 +447,8 @@ type WorkflowExecutionConfigInterface interface { GetAnnotations() *admin.Annotations // GetLabels Custom labels to be applied to a triggered execution resource. GetLabels() *admin.Labels + // GetInterruptible indicates a workflow should be flagged as interruptible for a single execution. + GetInterruptible() bool } // Merge into workflowExecConfig from spec and return true if any value has been changed @@ -483,6 +485,12 @@ func mergeIntoExecConfig(workflowExecConfig *admin.WorkflowExecutionConfig, spec workflowExecConfig.Annotations = spec.GetAnnotations() isChanged = true } + + if !workflowExecConfig.GetInterruptible() && spec.GetInterruptible() { + workflowExecConfig.Interruptible = true + isChanged = true + } + return isChanged } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 6bc9c53da..33aa244ce 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -188,6 +188,27 @@ func setDefaultLpCallbackForExecTest(repository interfaces.Repository) { repository.LaunchPlanRepo().(*repositoryMocks.MockLaunchPlanRepo).SetGetCallback(lpGetFunc) } +func setDefaultTaskCallbackForExecTest(repository interfaces.Repository) { + taskGetFunc := func(input interfaces.Identifier) (models.Task, error) { + return models.Task{ + TaskKey: models.TaskKey{ + Project: input.Project, + Domain: input.Domain, + Name: input.Name, + Version: input.Version, + }, + BaseModel: models.BaseModel{ + ID: uint(123), + CreatedAt: testutils.MockCreatedAtValue, + }, + Closure: testutils.GetTaskClosureBytes(), + Digest: []byte(input.Name), + Type: "python", + }, nil + } + repository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetGetCallback(taskGetFunc) +} + func getMockStorageForExecTest(ctx context.Context) *storage.DataStore { mockStorage := commonMocks.GetMockStorageClient() mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( @@ -887,6 +908,151 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) { assert.Equal(t, expectedResponse, response) } +func TestCreateExecutionDefaultInterruptible(t *testing.T) { + t.Run("LaunchPlan", func(t *testing.T) { + // Interruptible flag of request execution spec defaults to false if omitted + request := testutils.GetExecutionRequest() + request.Spec.Interruptible = false + + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.NotEqual(t, uint(0), input.LaunchPlanID) + assert.Equal(t, uint(0), input.TaskID) + assert.False(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, response.Id)) + }) + + t.Run("Task", func(t *testing.T) { + // Interruptible flag of request execution spec defaults to false if omitted + request := testutils.GetExecutionRequest() + request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK + request.Spec.Interruptible = false + + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, uint(0), input.LaunchPlanID) + assert.NotEqual(t, uint(0), input.TaskID) + assert.False(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, response.Id)) + }) +} + +func TestCreateExecutionInterruptibleOverride(t *testing.T) { + t.Run("LaunchPlan", func(t *testing.T) { + request := testutils.GetExecutionRequest() + request.Spec.Interruptible = true + + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.NotEqual(t, uint(0), input.LaunchPlanID) + assert.Equal(t, uint(0), input.TaskID) + assert.True(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, response.Id)) + }) + + t.Run("Task", func(t *testing.T) { + request := testutils.GetExecutionRequest() + request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK + request.Spec.Interruptible = true + + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, uint(0), input.LaunchPlanID) + assert.NotEqual(t, uint(0), input.TaskID) + assert.True(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, response.Id)) + }) +} + func makeExecutionGetFunc( t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { @@ -939,6 +1105,39 @@ func makeLegacyExecutionGetFunc( } } +func makeExecutionInterruptibleGetFunc( + t *testing.T, closureBytes []byte, startTime *time.Time, interruptible bool) repositoryMocks.GetExecutionFunc { + return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + assert.Equal(t, "project", input.Project) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "name", input.Name) + + request := testutils.GetExecutionRequest() + request.Spec.Interruptible = interruptible + + specBytes, err := proto.Marshal(request.Spec) + assert.Nil(t, err) + + return models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + BaseModel: models.BaseModel{ + ID: uint(8), + }, + Spec: specBytes, + Phase: core.WorkflowExecution_QUEUED.String(), + Closure: closureBytes, + LaunchPlanID: uint(1), + WorkflowID: uint(2), + StartedAt: startTime, + Cluster: testCluster, + }, nil + } +} + func TestRelaunchExecution(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() @@ -1078,6 +1277,69 @@ func TestRelaunchExecution_CreateFailure(t *testing.T) { assert.EqualError(t, err, expectedErr.Error()) } +func TestRelaunchExecutionInterruptibleOverride(t *testing.T) { + // Set up mocks. + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + startTime := time.Now() + startTimeProto, _ := ptypes.TimestampProto(startTime) + existingClosure := admin.ExecutionClosure{ + Phase: core.WorkflowExecution_RUNNING, + StartedAt: startTimeProto, + } + existingClosureBytes, _ := proto.Marshal(&existingClosure) + executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, true) + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + + var createCalled bool + exCreateFunc := func(ctx context.Context, input models.Execution) error { + createCalled = true + assert.Equal(t, "relaunchy", input.Name) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "project", input.Project) + assert.Equal(t, uint(8), input.SourceExecutionID) + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, admin.ExecutionMetadata_RELAUNCH, spec.Metadata.Mode) + assert.Equal(t, int32(admin.ExecutionMetadata_RELAUNCH), input.Mode) + assert.True(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + + // Issue request. + response, err := execManager.RelaunchExecution(context.Background(), admin.ExecutionRelaunchRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Name: "relaunchy", + }, requestedAt) + + // And verify response. + assert.Nil(t, err) + + expectedResponse := &admin.ExecutionCreateResponse{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "relaunchy", + }, + } + assert.True(t, createCalled) + assert.True(t, proto.Equal(expectedResponse, response)) + + // TODO: Test with inputs +} + func TestRecoverExecution(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() @@ -1315,6 +1577,67 @@ func TestRecoverExecution_GetExistingInputsFailure(t *testing.T) { assert.EqualError(t, err, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : foo") } +func TestRecoverExecutionInterruptibleOverride(t *testing.T) { + // Set up mocks. + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + startTime := time.Now() + startTimeProto, _ := ptypes.TimestampProto(startTime) + existingClosure := admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + StartedAt: startTimeProto, + } + existingClosureBytes, _ := proto.Marshal(&existingClosure) + executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, true) + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + + var createCalled bool + exCreateFunc := func(ctx context.Context, input models.Execution) error { + createCalled = true + assert.Equal(t, "recovered", input.Name) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "project", input.Project) + assert.Equal(t, uint(8), input.SourceExecutionID) + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode) + assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode) + assert.True(t, spec.Interruptible) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + + // Issue request. + response, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Name: "recovered", + }, requestedAt) + + // And verify response. + assert.Nil(t, err) + + expectedResponse := &admin.ExecutionCreateResponse{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "recovered", + }, + } + assert.True(t, createCalled) + assert.True(t, proto.Equal(expectedResponse, response)) +} + func TestCreateWorkflowEvent(t *testing.T) { repository := repositoryMocks.NewMockRepository() startTime := time.Now() @@ -2296,7 +2619,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) { workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - //ExecutionId: "1234", + // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", @@ -2378,7 +2701,7 @@ func TestExecutionManager_PublishNotificationsTransformError(t *testing.T) { workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - //ExecutionId: "1234", + // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", @@ -2437,7 +2760,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - //ExecutionId: "1234", + // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 671c709a3..fda0e9f34 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -72,6 +72,8 @@ type ApplicationConfig struct { Labels map[string]string `json:"labels,omitempty"` // Annotations to apply to the execution resource. Annotations map[string]string `json:"annotations,omitempty"` + // Interruptible indicates whether all tasks should be run as interruptible by default (unless specified otherwise via the execution/workflow/task definition) + Interruptible bool `json:"interruptible"` // Optional: security context override to apply this execution. // iam_role references the fully qualified name of Identity & Access Management role to impersonate. @@ -138,6 +140,10 @@ func (a *ApplicationConfig) GetLabels() *admin.Labels { } } +func (a *ApplicationConfig) GetInterruptible() bool { + return a.Interruptible +} + // This section holds common config for AWS type AWSConfig struct { Region string `json:"region"` diff --git a/pkg/workflowengine/impl/prepare_execution.go b/pkg/workflowengine/impl/prepare_execution.go index fc4745e30..6e48418f8 100644 --- a/pkg/workflowengine/impl/prepare_execution.go +++ b/pkg/workflowengine/impl/prepare_execution.go @@ -56,6 +56,7 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, } if workflowExecutionConfig != nil { executionConfig.MaxParallelism = uint32(workflowExecutionConfig.MaxParallelism) + executionConfig.Interruptible = workflowExecutionConfig.Interruptible } if taskResources != nil { var requests = v1alpha1.TaskResourceSpec{} diff --git a/pkg/workflowengine/impl/prepare_execution_test.go b/pkg/workflowengine/impl/prepare_execution_test.go index 674f0b983..6601dfab2 100644 --- a/pkg/workflowengine/impl/prepare_execution_test.go +++ b/pkg/workflowengine/impl/prepare_execution_test.go @@ -146,6 +146,14 @@ func TestAddExecutionOverrides(t *testing.T) { GPU: resource.MustParse("1"), }, workflow.ExecutionConfig.TaskResources.Limits) }) + t.Run("interruptible", func(t *testing.T) { + workflowExecutionConfig := &admin.WorkflowExecutionConfig{ + Interruptible: true, + } + workflow := &v1alpha1.FlyteWorkflow{} + addExecutionOverrides(nil, workflowExecutionConfig, nil, nil, workflow) + assert.True(t, workflow.ExecutionConfig.Interruptible) + }) } func TestPrepareFlyteWorkflow(t *testing.T) { @@ -187,6 +195,7 @@ func TestPrepareFlyteWorkflow(t *testing.T) { K8SServiceAccount: testK8sServiceAccountSc, }, }, + Interruptible: true, }, RecoveryExecution: recoveryNodeExecutionID, EventVersion: 1, @@ -218,6 +227,7 @@ func TestPrepareFlyteWorkflow(t *testing.T) { }, flyteWorkflow.ExecutionConfig.TaskPluginImpls) assert.Equal(t, flyteWorkflow.ServiceAccountName, testK8sServiceAccountSc) assert.Equal(t, flyteWorkflow.ExecutionConfig.MaxParallelism, uint32(50)) + assert.True(t, flyteWorkflow.ExecutionConfig.Interruptible) assert.True(t, proto.Equal(recoveryNodeExecutionID, flyteWorkflow.ExecutionConfig.RecoveryExecution.WorkflowExecutionIdentifier)) assert.Equal(t, flyteWorkflow.WorkflowMeta.EventVersion, v1alpha1.EventVersion(1)) assert.Equal(t, flyteWorkflow.RawOutputDataConfig, v1alpha1.RawOutputDataConfig{ From 87dfa5b99b548044daf253392b96e20ca6553467 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Tue, 26 Apr 2022 11:08:27 +0200 Subject: [PATCH 3/5] Interruptible override now uses BoolValue wrapper Allows for distinguishment between no value being provided and an override to false MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- go.mod | 9 +- go.sum | 8 +- pkg/manager/impl/execution_manager.go | 14 +- pkg/manager/impl/execution_manager_test.go | 272 ++++++++---------- .../interfaces/application_configuration.go | 12 +- pkg/workflowengine/impl/prepare_execution.go | 5 +- .../impl/prepare_execution_test.go | 11 +- 7 files changed, 151 insertions(+), 180 deletions(-) diff --git a/go.mod b/go.mod index f0f30759b..c1b3b7842 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,6 @@ module github.com/flyteorg/flyteadmin go 1.18 -replace ( - github.com/flyteorg/flyteidl => github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30 - github.com/flyteorg/flytepropeller => github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8 -) - require ( cloud.google.com/go/iam v0.1.0 cloud.google.com/go/storage v1.14.0 @@ -204,3 +199,7 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 + +replace github.com/flyteorg/flyteidl => github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64 + +replace github.com/flyteorg/flytepropeller => github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426084227-48d84e5890de diff --git a/go.sum b/go.sum index 150d15d1b..7dc5a5354 100644 --- a/go.sum +++ b/go.sum @@ -133,10 +133,10 @@ github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030I github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= -github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30 h1:NvHNJm+IcLBOb5Te3trSs7KJTqLJCMkcLgZ9WmxLQoA= -github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220414153602-81257a627c30/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= -github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8 h1:LXT81FQ5DeKGeid8qDPGg3i05TWtAUoIWwlY23pWWwg= -github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220421090510-afc31b9b41e8/go.mod h1:QBC3GoCUY0q4k69vXUBVnJZZ3XxkKHLj8KnmB5zsfF4= +github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64 h1:iVKLgar06cCYvfbXlRrKbgmr9CEwYAlIFcX1XExwH14= +github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= +github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426084227-48d84e5890de h1:9zo2owuWx47dzWD8Uhxrw3F5Oq5H0JT3YgVnErAhV08= +github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426084227-48d84e5890de/go.mod h1:T0+gG1Nu2v9SqYj1zZ9tERfvqWFHcWACLvXSoRwcUgw= github.com/NYTimes/gizmo v1.3.6 h1:K+GRagPdAxojsT1TlTQlMkTeOmgfLxSdvuOhdki7GG0= github.com/NYTimes/gizmo v1.3.6/go.mod h1:8S8QVnITA40p/1jGsUMcPI8R9SSKkoKu+8WF13s9Uhw= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 2b4252d8d..2f5151c5d 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -51,6 +51,7 @@ import ( "github.com/benbjohnson/clock" "github.com/flyteorg/flyteadmin/pkg/manager/impl/shared" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" ) const childContainerQueueKey = "child_queue" @@ -447,8 +448,8 @@ type WorkflowExecutionConfigInterface interface { GetAnnotations() *admin.Annotations // GetLabels Custom labels to be applied to a triggered execution resource. GetLabels() *admin.Labels - // GetInterruptible indicates a workflow should be flagged as interruptible for a single execution. - GetInterruptible() bool + // GetInterruptible indicates a workflow should be flagged as interruptible for a single execution. If omitted, the workflow's default is used. + GetInterruptible() *wrappers.BoolValue } // Merge into workflowExecConfig from spec and return true if any value has been changed @@ -486,8 +487,13 @@ func mergeIntoExecConfig(workflowExecConfig *admin.WorkflowExecutionConfig, spec isChanged = true } - if !workflowExecConfig.GetInterruptible() && spec.GetInterruptible() { - workflowExecConfig.Interruptible = true + // Override interruptible flag if workflow execution config does not have a value set or the spec sets a different + // value that defined as the workflow default. This allows for workflows to have their interruptible setting + // explicitly turned on and off for a single execution. + if (workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil) || + (workflowExecConfig.GetInterruptible() != nil && spec.GetInterruptible() != nil && + workflowExecConfig.GetInterruptible().GetValue() != spec.GetInterruptible().GetValue()) { + workflowExecConfig.Interruptible = spec.GetInterruptible() isChanged = true } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index dcd928db5..09bf7a76e 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -30,9 +30,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/sets" - eventWriterMocks "github.com/flyteorg/flyteadmin/pkg/async/events/mocks" + "k8s.io/apimachinery/pkg/util/sets" "github.com/flyteorg/flyteadmin/auth" @@ -60,6 +59,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" mockScope "github.com/flyteorg/flytestdlib/promutils" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/stretchr/testify/assert" ) @@ -908,149 +908,107 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) { assert.Equal(t, expectedResponse, response) } -func TestCreateExecutionDefaultInterruptible(t *testing.T) { - t.Run("LaunchPlan", func(t *testing.T) { - // Interruptible flag of request execution spec defaults to false if omitted - request := testutils.GetExecutionRequest() - request.Spec.Interruptible = false - - repository := getMockRepositoryForExecTest() - setDefaultLpCallbackForExecTest(repository) - setDefaultTaskCallbackForExecTest(repository) - - exCreateFunc := func(ctx context.Context, input models.Execution) error { - var spec admin.ExecutionSpec - err := proto.Unmarshal(input.Spec, &spec) - assert.Nil(t, err) - assert.NotEqual(t, uint(0), input.LaunchPlanID) - assert.Equal(t, uint(0), input.TaskID) - assert.False(t, spec.Interruptible) - return nil - } - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - - mockExecutor := workflowengineMocks.WorkflowExecutor{} - mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) - mockExecutor.OnID().Return("testMockExecutor") - r := plugins.NewRegistry() - r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) - execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) - - response, err := execManager.CreateExecution(context.Background(), request, requestedAt) - assert.Nil(t, err) - assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, response.Id)) - }) - - t.Run("Task", func(t *testing.T) { - // Interruptible flag of request execution spec defaults to false if omitted - request := testutils.GetExecutionRequest() - request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK - request.Spec.Interruptible = false - - repository := getMockRepositoryForExecTest() - setDefaultLpCallbackForExecTest(repository) - setDefaultTaskCallbackForExecTest(repository) - - exCreateFunc := func(ctx context.Context, input models.Execution) error { - var spec admin.ExecutionSpec - err := proto.Unmarshal(input.Spec, &spec) - assert.Nil(t, err) - assert.Equal(t, uint(0), input.LaunchPlanID) - assert.NotEqual(t, uint(0), input.TaskID) - assert.False(t, spec.Interruptible) - return nil - } - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - mockExecutor := workflowengineMocks.WorkflowExecutor{} - mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) - mockExecutor.OnID().Return("testMockExecutor") - r := plugins.NewRegistry() - r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) - execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) - - response, err := execManager.CreateExecution(context.Background(), request, requestedAt) - assert.Nil(t, err) - assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, response.Id)) - }) -} - -func TestCreateExecutionInterruptibleOverride(t *testing.T) { - t.Run("LaunchPlan", func(t *testing.T) { - request := testutils.GetExecutionRequest() - request.Spec.Interruptible = true - - repository := getMockRepositoryForExecTest() - setDefaultLpCallbackForExecTest(repository) - setDefaultTaskCallbackForExecTest(repository) +func TestCreateExecutionInterruptible(t *testing.T) { + enable := true + disable := false + tests := []struct { + name string + task bool + interruptible *bool + want bool + }{ + { + name: "LaunchPlanDefault", + task: false, + interruptible: nil, + want: false, + }, + { + name: "LaunchPlanDisable", + task: false, + interruptible: &disable, + want: false, + }, + { + name: "LaunchPlanEnable", + task: false, + interruptible: &enable, + want: true, + }, + { + name: "TaskDefault", + task: true, + interruptible: nil, + want: false, + }, + { + name: "TaskDisable", + task: true, + interruptible: &disable, + want: false, + }, + { + name: "TaskEnable", + task: true, + interruptible: &enable, + want: true, + }, + } - exCreateFunc := func(ctx context.Context, input models.Execution) error { - var spec admin.ExecutionSpec - err := proto.Unmarshal(input.Spec, &spec) - assert.Nil(t, err) - assert.NotEqual(t, uint(0), input.LaunchPlanID) - assert.Equal(t, uint(0), input.TaskID) - assert.True(t, spec.Interruptible) - return nil - } - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - mockExecutor := workflowengineMocks.WorkflowExecutor{} - mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) - mockExecutor.OnID().Return("testMockExecutor") - r := plugins.NewRegistry() - r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) - execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() - response, err := execManager.CreateExecution(context.Background(), request, requestedAt) - assert.Nil(t, err) - assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, response.Id)) - }) + request := testutils.GetExecutionRequest() + if tt.task { + request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK + } + if tt.interruptible == nil { + request.Spec.Interruptible = nil + } else { + request.Spec.Interruptible = &wrappers.BoolValue{Value: *tt.interruptible} + } - t.Run("Task", func(t *testing.T) { - request := testutils.GetExecutionRequest() - request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK - request.Spec.Interruptible = true + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + + if tt.task { + assert.Equal(t, uint(0), input.LaunchPlanID) + assert.NotEqual(t, uint(0), input.TaskID) + } else { + assert.NotEqual(t, uint(0), input.LaunchPlanID) + assert.Equal(t, uint(0), input.TaskID) + } + + if tt.interruptible == nil { + assert.Nil(t, spec.GetInterruptible()) + } else { + assert.NotNil(t, spec.GetInterruptible()) + assert.Equal(t, *tt.interruptible, spec.GetInterruptible().GetValue()) + } + + return nil + } - repository := getMockRepositoryForExecTest() - setDefaultLpCallbackForExecTest(repository) - setDefaultTaskCallbackForExecTest(repository) + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) - exCreateFunc := func(ctx context.Context, input models.Execution) error { - var spec admin.ExecutionSpec - err := proto.Unmarshal(input.Spec, &spec) + _, err := execManager.CreateExecution(context.Background(), request, requestedAt) assert.Nil(t, err) - assert.Equal(t, uint(0), input.LaunchPlanID) - assert.NotEqual(t, uint(0), input.TaskID) - assert.True(t, spec.Interruptible) - return nil - } - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - mockExecutor := workflowengineMocks.WorkflowExecutor{} - mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) - mockExecutor.OnID().Return("testMockExecutor") - r := plugins.NewRegistry() - r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) - execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) - - response, err := execManager.CreateExecution(context.Background(), request, requestedAt) - assert.Nil(t, err) - assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, response.Id)) - }) + }) + } } func makeExecutionGetFunc( @@ -1106,14 +1064,18 @@ func makeLegacyExecutionGetFunc( } func makeExecutionInterruptibleGetFunc( - t *testing.T, closureBytes []byte, startTime *time.Time, interruptible bool) repositoryMocks.GetExecutionFunc { + t *testing.T, closureBytes []byte, startTime *time.Time, interruptible *bool) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { assert.Equal(t, "project", input.Project) assert.Equal(t, "domain", input.Domain) assert.Equal(t, "name", input.Name) request := testutils.GetExecutionRequest() - request.Spec.Interruptible = interruptible + if interruptible == nil { + request.Spec.Interruptible = nil + } else { + request.Spec.Interruptible = &wrappers.BoolValue{Value: *interruptible} + } specBytes, err := proto.Marshal(request.Spec) assert.Nil(t, err) @@ -1294,7 +1256,8 @@ func TestRelaunchExecutionInterruptibleOverride(t *testing.T) { StartedAt: startTimeProto, } existingClosureBytes, _ := proto.Marshal(&existingClosure) - executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, true) + interruptible := true + executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, &interruptible) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) var createCalled bool @@ -1309,13 +1272,13 @@ func TestRelaunchExecutionInterruptibleOverride(t *testing.T) { assert.Nil(t, err) assert.Equal(t, admin.ExecutionMetadata_RELAUNCH, spec.Metadata.Mode) assert.Equal(t, int32(admin.ExecutionMetadata_RELAUNCH), input.Mode) - assert.True(t, spec.Interruptible) + assert.NotNil(t, spec.GetInterruptible()) + assert.True(t, spec.GetInterruptible().GetValue()) return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - // Issue request. - response, err := execManager.RelaunchExecution(context.Background(), admin.ExecutionRelaunchRequest{ + _, err := execManager.RelaunchExecution(context.Background(), admin.ExecutionRelaunchRequest{ Id: &core.WorkflowExecutionIdentifier{ Project: "project", Domain: "domain", @@ -1323,21 +1286,8 @@ func TestRelaunchExecutionInterruptibleOverride(t *testing.T) { }, Name: "relaunchy", }, requestedAt) - - // And verify response. assert.Nil(t, err) - - expectedResponse := &admin.ExecutionCreateResponse{ - Id: &core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "relaunchy", - }, - } assert.True(t, createCalled) - assert.True(t, proto.Equal(expectedResponse, response)) - - // TODO: Test with inputs } func TestRecoverExecution(t *testing.T) { @@ -1594,7 +1544,8 @@ func TestRecoverExecutionInterruptibleOverride(t *testing.T) { StartedAt: startTimeProto, } existingClosureBytes, _ := proto.Marshal(&existingClosure) - executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, true) + interruptible := true + executionGetFunc := makeExecutionInterruptibleGetFunc(t, existingClosureBytes, &startTime, &interruptible) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) var createCalled bool @@ -1609,7 +1560,8 @@ func TestRecoverExecutionInterruptibleOverride(t *testing.T) { assert.Nil(t, err) assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode) assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode) - assert.True(t, spec.Interruptible) + assert.NotNil(t, spec.GetInterruptible()) + assert.True(t, spec.GetInterruptible().GetValue()) return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index fda0e9f34..ea5ac6b13 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -5,6 +5,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" + "github.com/golang/protobuf/ptypes/wrappers" "golang.org/x/time/rate" ) @@ -140,8 +141,15 @@ func (a *ApplicationConfig) GetLabels() *admin.Labels { } } -func (a *ApplicationConfig) GetInterruptible() bool { - return a.Interruptible +func (a *ApplicationConfig) GetInterruptible() *wrappers.BoolValue { + // only return interruptible override if set to true as all workflows would be overwritten by the zero value false otherwise + if !a.Interruptible { + return nil + } + + return &wrappers.BoolValue{ + Value: true, + } } // This section holds common config for AWS diff --git a/pkg/workflowengine/impl/prepare_execution.go b/pkg/workflowengine/impl/prepare_execution.go index 24a3e6b40..0811298ce 100644 --- a/pkg/workflowengine/impl/prepare_execution.go +++ b/pkg/workflowengine/impl/prepare_execution.go @@ -56,7 +56,10 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, } if workflowExecutionConfig != nil { executionConfig.MaxParallelism = uint32(workflowExecutionConfig.MaxParallelism) - executionConfig.Interruptible = workflowExecutionConfig.Interruptible + if workflowExecutionConfig.GetInterruptible() != nil { + interruptible := workflowExecutionConfig.GetInterruptible().GetValue() + executionConfig.Interruptible = &interruptible + } } if taskResources != nil { var requests = v1alpha1.TaskResourceSpec{} diff --git a/pkg/workflowengine/impl/prepare_execution_test.go b/pkg/workflowengine/impl/prepare_execution_test.go index 2a10e763a..87c122087 100644 --- a/pkg/workflowengine/impl/prepare_execution_test.go +++ b/pkg/workflowengine/impl/prepare_execution_test.go @@ -11,6 +11,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" ) @@ -148,11 +149,12 @@ func TestAddExecutionOverrides(t *testing.T) { }) t.Run("interruptible", func(t *testing.T) { workflowExecutionConfig := &admin.WorkflowExecutionConfig{ - Interruptible: true, + Interruptible: &wrappers.BoolValue{Value: true}, } workflow := &v1alpha1.FlyteWorkflow{} addExecutionOverrides(nil, workflowExecutionConfig, nil, nil, workflow) - assert.True(t, workflow.ExecutionConfig.Interruptible) + assert.NotNil(t, workflow.ExecutionConfig.Interruptible) + assert.True(t, *workflow.ExecutionConfig.Interruptible) }) } @@ -195,7 +197,7 @@ func TestPrepareFlyteWorkflow(t *testing.T) { }, ExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: 50, - Interruptible: true, + Interruptible: &wrappers.BoolValue{Value: true}, }, RecoveryExecution: recoveryNodeExecutionID, EventVersion: 1, @@ -227,7 +229,8 @@ func TestPrepareFlyteWorkflow(t *testing.T) { }, flyteWorkflow.ExecutionConfig.TaskPluginImpls) assert.Equal(t, flyteWorkflow.ServiceAccountName, testK8sServiceAccountSc) assert.Equal(t, flyteWorkflow.ExecutionConfig.MaxParallelism, uint32(50)) - assert.True(t, flyteWorkflow.ExecutionConfig.Interruptible) + assert.NotNil(t, flyteWorkflow.ExecutionConfig.Interruptible) + assert.True(t, *flyteWorkflow.ExecutionConfig.Interruptible) assert.True(t, proto.Equal(recoveryNodeExecutionID, flyteWorkflow.ExecutionConfig.RecoveryExecution.WorkflowExecutionIdentifier)) assert.Equal(t, flyteWorkflow.WorkflowMeta.EventVersion, v1alpha1.EventVersion(1)) assert.Equal(t, flyteWorkflow.RawOutputDataConfig, v1alpha1.RawOutputDataConfig{ From 353541fa06c5de255f86ebc9e81025bd7317a709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Wed, 27 Apr 2022 11:52:43 +0200 Subject: [PATCH 4/5] Test comments cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- pkg/manager/impl/execution_manager_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 09bf7a76e..b9d02c210 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -2571,7 +2571,6 @@ func TestExecutionManager_PublishNotifications(t *testing.T) { workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", @@ -2653,7 +2652,6 @@ func TestExecutionManager_PublishNotificationsTransformError(t *testing.T) { workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", @@ -2712,7 +2710,6 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro workflowRequest := admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_FAILED, - // ExecutionId: "1234", OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &core.ExecutionError{ Code: "CodeBad", From 58d09e2bfdaa254fd035e0cb17fa157284930753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Tue, 3 May 2022 00:00:45 +0200 Subject: [PATCH 5/5] Use released flyteidl/flytepropeller versions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- go.mod | 8 ++------ go.sum | 14 +++++--------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 31271cef6..ddefdeb22 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,9 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v1.0.0 + github.com/flyteorg/flyteidl v1.1.0 github.com/flyteorg/flyteplugins v1.0.0 - github.com/flyteorg/flytepropeller v1.0.0 + github.com/flyteorg/flytepropeller v1.1.0 github.com/flyteorg/flytestdlib v1.0.0 github.com/flyteorg/stow v0.3.3 github.com/ghodss/yaml v1.0.0 @@ -199,7 +199,3 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 - -replace github.com/flyteorg/flyteidl => github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64 - -replace github.com/flyteorg/flytepropeller => github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426142632-4fdc51f2f6bb diff --git a/go.sum b/go.sum index 565d3f8ed..4bddc6092 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,6 @@ cloud.google.com/go/storage v1.14.0 h1:6RRlFMv1omScs6iq2hfE3IvgE+l6RfJPampq8UZc5 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= contrib.go.opencensus.io/exporter/stackdriver v0.13.1/go.mod h1:z2tyTZtPmQ2HvWH4cOmVDgtY+1lomfKdbLnkJvZdc8c= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/Azure/azure-sdk-for-go v32.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v51.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v62.3.0+incompatible h1:Ctfsn9UoA/BB4HMYQlbPPgNXdX0tZ4tmb85+KFb2+RE= github.com/Azure/azure-sdk-for-go v62.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 h1:qoVeMsc9/fh/yhxVaA0obYjVH/oI/ihrOoMwsLS9KSA= @@ -133,10 +131,6 @@ github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030I github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= -github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64 h1:iVKLgar06cCYvfbXlRrKbgmr9CEwYAlIFcX1XExwH14= -github.com/MorpheusXAUT/flyteidl v0.24.22-0.20220426074538-c412ba26cf64/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= -github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426142632-4fdc51f2f6bb h1:SuNDqyIzBidGsaU01b+Zs/BOvXsOz30+nUD4PBC0TSs= -github.com/MorpheusXAUT/flytepropeller v0.16.48-0.20220426142632-4fdc51f2f6bb/go.mod h1:ioW3JfUilrvmytLSks/Qi/nGGqTx6m1fzlgZgS6LTZ4= github.com/NYTimes/gizmo v1.3.6 h1:K+GRagPdAxojsT1TlTQlMkTeOmgfLxSdvuOhdki7GG0= github.com/NYTimes/gizmo v1.3.6/go.mod h1:8S8QVnITA40p/1jGsUMcPI8R9SSKkoKu+8WF13s9Uhw= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -371,9 +365,13 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flyteorg/flyteidl v1.0.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM= +github.com/flyteorg/flyteidl v1.1.0 h1:f8tdMXOuorS/d+4Ut2QarfDbdCOriK0S+EnlQzrwz9E= +github.com/flyteorg/flyteidl v1.1.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM= github.com/flyteorg/flyteplugins v1.0.0 h1:77hUJjiIxBmQ9rd3+cXjSGnzOVAFrSzCd59aIaYFB/8= github.com/flyteorg/flyteplugins v1.0.0/go.mod h1:4Cpn+9RfanIieTTh2XsuL6zPYXtsR5UDe8YaEmXONT4= -github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= +github.com/flyteorg/flytepropeller v1.1.0 h1:R4iVoznTtpa10VC+CqCi4Wal/Yc+cng3dOi7hCLpi+Y= +github.com/flyteorg/flytepropeller v1.1.0/go.mod h1:QkWYTYZ3lrCY5nXKrYkVwbaGB2IhfzDmIGAWeWzuoak= github.com/flyteorg/flytestdlib v1.0.0 h1:gb99ignMsVcNTUmWzArtcIDdkRjyzQQVBkWNOQakiFg= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/stow v0.3.3 h1:tzeNl8mSZFL3oJDi0ACZj6FAineQAF4qyEp6bXtIdQY= @@ -858,7 +856,6 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v1.3.0/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= -github.com/graymeta/stow v0.2.7/go.mod h1:JAs139Zr29qfsecy7b+h9DRsWXbFbsd7LCrbCDYI84k= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -1654,7 +1651,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201217014255-9d1352758620/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=