Skip to content

Commit

Permalink
fix: Look in correct bucket when downloading artifacts (Template.Arch…
Browse files Browse the repository at this point in the history
…iveLocation configured) (#9301)

* fix: artifact server can use ArchiveLocation defined by Template

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>

* fix: use Template ArchiveLocation

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>

* fix: test

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>
  • Loading branch information
juliev0 authored Aug 8, 2022
1 parent b356cb5 commit 0d77f55
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 33 deletions.
26 changes: 21 additions & 5 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,28 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
return nil, nil, fmt.Errorf("artifact not found: %s", artifactName)
}

ar, err := a.artifactRepositories.Get(ctx, wf.Status.ArtifactRepositoryRef)
if err != nil {
return art, nil, err
// Artifact Location can be defined in various places:
// 1. In the Artifact itself
// 2. Defined by Controller configmap
// 3. Workflow spec defines artifactRepositoryRef which is a ConfigMap which defines the location
// 4. Template defines ArchiveLocation

templateName := wf.Status.Nodes[nodeId].TemplateName
template := wf.GetTemplateByName(templateName)
if template == nil {
return nil, nil, fmt.Errorf("no template found by the name of '%s' (which is the template associated with nodeId '%s'??", templateName, nodeId)
}

archiveLocation := template.ArchiveLocation // this is case 4
if !archiveLocation.HasLocation() {
ar, err := a.artifactRepositories.Get(ctx, wf.Status.ArtifactRepositoryRef) // this should handle cases 2 and 3
if err != nil {
return art, nil, err
}
archiveLocation = ar.ToArtifactLocation()
}
l := ar.ToArtifactLocation()
err = art.Relocate(l)

err := art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to
if err != nil {
return art, nil, err
}
Expand Down
156 changes: 128 additions & 28 deletions server/artifacts/artifact_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,28 @@ func (a *fakeArtifactDriver) Load(_ *wfv1.Artifact, path string) error {
return ioutil.WriteFile(path, a.data, 0o600)
}

var bucketsOfKeys = map[string][]string{
"my-bucket": []string{
"my-wf/my-node-1/my-s3-input-artifact.tgz",
"my-wf/my-node-1/my-s3-artifact-directory",
"my-wf/my-node-1/my-s3-artifact-directory/a.txt",
"my-wf/my-node-1/my-s3-artifact-directory/subdirectory/b.txt",
"my-wf/my-node-1/my-gcs-artifact",
"my-wf/my-node-1/my-gcs-artifact.tgz",
"my-wf/my-node-1/my-oss-artifact.zip",
"my-wf/my-node-1/my-s3-artifact.tgz",
},
"my-bucket-2": []string{
"my-wf/my-node-2/my-s3-artifact-bucket-2",
},
"my-bucket-3": []string{
"my-wf/my-node-2/my-s3-artifact-bucket-3",
},
}

func (a *fakeArtifactDriver) OpenStream(artifact *wfv1.Artifact) (io.ReadCloser, error) {
//fmt.Printf("deletethis: artifact=%+v\n", artifact)

key, err := artifact.ArtifactLocation.GetKey()
if err != nil {
return nil, err
Expand All @@ -63,6 +84,25 @@ func (a *fakeArtifactDriver) OpenStream(artifact *wfv1.Artifact) (io.ReadCloser,
} else if strings.HasSuffix(key, "somethingElseWentWrong.txt") {
return nil, errors.New("whatever")
}

if artifact.S3 != nil {
// make sure it's a recognizable bucket/key
keysInBucket, found := bucketsOfKeys[artifact.S3.Bucket]
if !found {
return nil, fmt.Errorf("artifact bucket not found: %+v", artifact)
}
foundKey := false
for _, recognizableKey := range keysInBucket {
if key == recognizableKey {
foundKey = true
break
}
}
if !foundKey {
return nil, fmt.Errorf("artifact key '%s' not found in bucket '%s'", key, artifact.S3.Bucket)
}
}

return io.NopCloser(bytes.NewReader(a.data)), nil
}

Expand Down Expand Up @@ -91,15 +131,15 @@ func (a *fakeArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string, err
if artifact.Name == "my-s3-artifact-directory" {
if strings.HasSuffix(key, "subdirectory") {
return []string{
"my-wf/my-node/my-s3-artifact-directory/subdirectory/b.txt",
"my-wf/my-node/my-s3-artifact-directory/subdirectory/c.txt",
"my-wf/my-node-1/my-s3-artifact-directory/subdirectory/b.txt",
"my-wf/my-node-1/my-s3-artifact-directory/subdirectory/c.txt",
}, nil
} else {
return []string{
"my-wf/my-node/my-s3-artifact-directory/a.txt",
"my-wf/my-node/my-s3-artifact-directory/index.html",
"my-wf/my-node/my-s3-artifact-directory/subdirectory/b.txt",
"my-wf/my-node/my-s3-artifact-directory/subdirectory/c.txt",
"my-wf/my-node-1/my-s3-artifact-directory/a.txt",
"my-wf/my-node-1/my-s3-artifact-directory/index.html",
"my-wf/my-node-1/my-s3-artifact-directory/subdirectory/b.txt",
"my-wf/my-node-1/my-s3-artifact-directory/subdirectory/c.txt",
}, nil
}
}
Expand All @@ -114,16 +154,36 @@ func newServer() *ArtifactServer {
ObjectMeta: metav1.ObjectMeta{Namespace: "my-ns", Name: "my-wf", Labels: map[string]string{
common.LabelKeyControllerInstanceID: instanceId,
}},
Spec: wfv1.WorkflowSpec{
Templates: []wfv1.Template{
{
Name: "template-1",
},
{
Name: "template-2",
ArchiveLocation: &wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
Key: "key-1",
S3Bucket: wfv1.S3Bucket{
Bucket: "my-bucket-3",
Endpoint: "minio:9000",
},
},
},
},
},
},
Status: wfv1.WorkflowStatus{
Nodes: wfv1.Nodes{
"my-node": wfv1.NodeStatus{
"my-node-1": wfv1.NodeStatus{
TemplateName: "template-1",
Inputs: &wfv1.Inputs{
Artifacts: wfv1.Artifacts{
{
Name: "my-s3-input-artifact",
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
Key: "my-wf/my-node/my-s3-input-artifact.tgz",
Key: "my-wf/my-node-1/my-s3-input-artifact.tgz",
},
},
},
Expand All @@ -136,7 +196,7 @@ func newServer() *ArtifactServer {
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
// S3 is a configured artifact repo, so does not need key
Key: "my-wf/my-node/my-s3-artifact.tgz",
Key: "my-wf/my-node-1/my-s3-artifact.tgz",
},
},
},
Expand All @@ -145,7 +205,7 @@ func newServer() *ArtifactServer {
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
// S3 is a configured artifact repo, so does not need key
Key: "my-wf/my-node/my-s3-artifact-directory",
Key: "my-wf/my-node-1/my-s3-artifact-directory",
},
},
},
Expand All @@ -157,7 +217,7 @@ func newServer() *ArtifactServer {
GCSBucket: wfv1.GCSBucket{
Bucket: "my-bucket",
},
Key: "my-wf/my-node/my-gcs-artifact",
Key: "my-wf/my-node-1/my-gcs-artifact",
},
},
},
Expand All @@ -169,7 +229,7 @@ func newServer() *ArtifactServer {
GCSBucket: wfv1.GCSBucket{
Bucket: "my-bucket",
},
Key: "my-wf/my-node/my-gcs-artifact.tgz",
Key: "my-wf/my-node-1/my-gcs-artifact.tgz",
},
},
},
Expand All @@ -181,7 +241,37 @@ func newServer() *ArtifactServer {
OSSBucket: wfv1.OSSBucket{
Bucket: "my-bucket",
},
Key: "my-wf/my-node/my-oss-artifact.zip",
Key: "my-wf/my-node-1/my-oss-artifact.zip",
},
},
},
},
},
},

"my-node-2": wfv1.NodeStatus{
TemplateName: "template-2",
Outputs: &wfv1.Outputs{
Artifacts: wfv1.Artifacts{
{
Name: "my-s3-artifact-bucket-3",
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
// S3 is a configured artifact repo, so does not need key
Key: "my-wf/my-node-2/my-s3-artifact-bucket-3",
},
},
},
{
Name: "my-s3-artifact-bucket-2",
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
// S3 is a configured artifact repo, so does not need key
Key: "my-wf/my-node-2/my-s3-artifact-bucket-2",
S3Bucket: wfv1.S3Bucket{
Bucket: "my-bucket-2",
Endpoint: "minio:9000",
},
},
},
},
Expand Down Expand Up @@ -231,17 +321,17 @@ func TestArtifactServer_GetArtifactFile(t *testing.T) {
directoryFiles []string // verify these files are in there, if this is a directory
}{
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory",
statusCode: 307, // redirect
location: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/",
location: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/",
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/",
statusCode: 307, // redirect
location: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/index.html",
location: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/index.html",
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/subdirectory/",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/subdirectory/",
statusCode: 200,
isDirectory: true,
directoryFiles: []string{
Expand All @@ -251,27 +341,37 @@ func TestArtifactServer_GetArtifactFile(t *testing.T) {
},
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/a.txt",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/a.txt",
statusCode: 200,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/subdirectory/b.txt",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/subdirectory/b.txt",
statusCode: 200,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/deletedFile.txt",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/deletedFile.txt",
statusCode: 404,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-s3-artifact-directory/somethingElseWentWrong.txt",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-s3-artifact-directory/somethingElseWentWrong.txt",
statusCode: 500,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node/outputs/my-gcs-artifact-file/my-gcs-artifact.tgz",
path: "/artifact-files/my-ns/workflows/my-wf/my-node-1/outputs/my-gcs-artifact-file/my-gcs-artifact.tgz",
statusCode: 200,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node-2/outputs/my-s3-artifact-bucket-3",
statusCode: 200,
isDirectory: false,
},
{
path: "/artifact-files/my-ns/workflows/my-wf/my-node-2/outputs/my-s3-artifact-bucket-2",
statusCode: 200,
isDirectory: false,
},
Expand Down Expand Up @@ -332,7 +432,7 @@ func TestArtifactServer_GetOutputArtifact(t *testing.T) {
for _, tt := range tests {
t.Run(tt.artifactName, func(t *testing.T) {
r := &http.Request{}
r.URL = mustParse(fmt.Sprintf("/artifacts/my-ns/my-wf/my-node/%s", tt.artifactName))
r.URL = mustParse(fmt.Sprintf("/artifacts/my-ns/my-wf/my-node-1/%s", tt.artifactName))
recorder := httptest.NewRecorder()

s.GetOutputArtifact(recorder, r)
Expand Down Expand Up @@ -364,7 +464,7 @@ func TestArtifactServer_GetInputArtifact(t *testing.T) {
for _, tt := range tests {
t.Run(tt.artifactName, func(t *testing.T) {
r := &http.Request{}
r.URL = mustParse(fmt.Sprintf("/input-artifacts/my-ns/my-wf/my-node/%s", tt.artifactName))
r.URL = mustParse(fmt.Sprintf("/input-artifacts/my-ns/my-wf/my-node-1/%s", tt.artifactName))
recorder := httptest.NewRecorder()
s.GetInputArtifact(recorder, r)
if assert.Equal(t, 200, recorder.Result().StatusCode) {
Expand Down Expand Up @@ -396,7 +496,7 @@ func TestArtifactServer_NodeWithoutArtifact(t *testing.T) {
func TestArtifactServer_GetOutputArtifactWithoutInstanceID(t *testing.T) {
s := newServer()
r := &http.Request{}
r.URL = mustParse("/artifacts/my-ns/your-wf/my-node/my-artifact")
r.URL = mustParse("/artifacts/my-ns/your-wf/my-node-1/my-artifact")
w := &testhttp.TestResponseWriter{}
s.GetOutputArtifact(w, r)
assert.NotEqual(t, 200, w.StatusCode)
Expand All @@ -405,7 +505,7 @@ func TestArtifactServer_GetOutputArtifactWithoutInstanceID(t *testing.T) {
func TestArtifactServer_GetOutputArtifactByUID(t *testing.T) {
s := newServer()
r := &http.Request{}
r.URL = mustParse("/artifacts/my-uuid/my-node/my-artifact")
r.URL = mustParse("/artifacts/my-uuid/my-node-1/my-artifact")
w := &testhttp.TestResponseWriter{}
s.GetOutputArtifactByUID(w, r)
assert.Equal(t, 401, w.StatusCode)
Expand All @@ -415,7 +515,7 @@ func TestArtifactServer_GetArtifactByUIDInvalidRequestPath(t *testing.T) {
s := newServer()
r := &http.Request{}
// missing my-artifact part to have a valid URL
r.URL = mustParse("/input-artifacts/my-uuid/my-node")
r.URL = mustParse("/input-artifacts/my-uuid/my-node-1")
w := &testhttp.TestResponseWriter{}
s.GetInputArtifactByUID(w, r)
// make sure there is no index out of bounds error
Expand Down

0 comments on commit 0d77f55

Please sign in to comment.