diff --git a/cmd/register/files_test.go b/cmd/register/files_test.go index 0eec77014b..129d4f49c1 100644 --- a/cmd/register/files_test.go +++ b/cmd/register/files_test.go @@ -60,6 +60,31 @@ func TestRegisterFromFiles(t *testing.T) { err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) assert.Nil(t, err) }) + t.Run("Register a workflow with a failure node", func(t *testing.T) { + s := setup() + testScope := promutils.NewTestScope() + labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey) + registerFilesSetup() + rconfig.DefaultFilesConfig.Archive = true + rconfig.DefaultFilesConfig.OutputLocationPrefix = s3Output + rconfig.DefaultFilesConfig.DeprecatedSourceUploadPath = s3Output + mockStorage, err := storage.NewDataStore(&storage.Config{ + Type: storage.TypeMemory, + }, testScope.NewSubScope("flytectl")) + assert.Nil(t, err) + Client = mockStorage + + args := []string{"testdata/failure-node.tgz"} + s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + mockDataProxy := s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient) + mockDataProxy.OnCreateUploadLocationMatch(s.Ctx, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil) + + err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) + assert.Nil(t, err) + }) t.Run("Failed fast registration while uploading the codebase", func(t *testing.T) { s := setup() registerFilesSetup() diff --git a/cmd/register/register.go b/cmd/register/register.go index a04c99bd19..7caa1e9bbd 100644 --- a/cmd/register/register.go +++ b/cmd/register/register.go @@ -10,7 +10,7 @@ import ( // Long descriptions are whitespace sensitive when generating docs using sphinx. const ( registerCmdShort = "Registers tasks, workflows, and launch plans from a list of generated serialized files." - registercmdLong = ` + registerCmdLong = ` Take input files as serialized versions of the tasks/workflows/launchplans and register them with FlyteAdmin. Currently, these input files are protobuf files generated as output from Flytekit serialize. Project and Domain are mandatory fields to be passed for registration and an optional version which defaults to v1. @@ -23,7 +23,7 @@ func RemoteRegisterCommand() *cobra.Command { registerCmd := &cobra.Command{ Use: "register", Short: registerCmdShort, - Long: registercmdLong, + Long: registerCmdLong, } registerResourcesFuncs := map[string]cmdcore.CommandEntry{ "files": {CmdFunc: registerFromFilesFunc, Aliases: []string{"file"}, PFlagProvider: rconfig.DefaultFilesConfig, diff --git a/cmd/register/register_util.go b/cmd/register/register_util.go index e4dd4355ad..5234e86697 100644 --- a/cmd/register/register_util.go +++ b/cmd/register/register_util.go @@ -491,6 +491,11 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co return err } } + if workflowSpec.Template.GetFailureNode() != nil { + if err := hydrateNode(workflowSpec.Template.GetFailureNode(), config.Version, config.Force); err != nil { + return err + } + } hydrateIdentifier(workflowSpec.Template.Id, config.Version, config.Force) for _, subWorkflow := range workflowSpec.SubWorkflows { for _, Noderef := range subWorkflow.Nodes { @@ -498,6 +503,11 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co return err } } + if subWorkflow.GetFailureNode() != nil { + if err := hydrateNode(subWorkflow.GetFailureNode(), config.Version, config.Force); err != nil { + return err + } + } hydrateIdentifier(subWorkflow.Id, config.Version, config.Force) } case *admin.TaskSpec: diff --git a/cmd/register/testdata/failure-node.tgz b/cmd/register/testdata/failure-node.tgz new file mode 100644 index 0000000000..7ac63e86fe Binary files /dev/null and b/cmd/register/testdata/failure-node.tgz differ