Skip to content

Commit

Permalink
feat: support zip for output artifacts archive. Fixes #8861 (#8973)
Browse files Browse the repository at this point in the history
Signed-off-by: Belyenochi <491537461q@gmail.com>
  • Loading branch information
Belyenochi authored Aug 2, 2022
1 parent a51e833 commit 55d15ae
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 13 deletions.
83 changes: 83 additions & 0 deletions util/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package archive

import (
"archive/tar"
"archive/zip"
"compress/gzip"
"io"
"os"
Expand Down Expand Up @@ -51,6 +52,30 @@ func TarGzToWriter(sourcePath string, level int, w io.Writer) error {
return tarFile(sourcePath, tw)
}

// ZipToWriter zip the source path to the supplied writer
func ZipToWriter(sourcePath string, zw *zip.Writer) error {
sourcePath, err := filepath.Abs(sourcePath)
if err != nil {
return errors.InternalErrorf("getting absolute path: %v", err)
}
log.Infof("Zipping %s", sourcePath)
sourceFi, err := os.Stat(sourcePath)
if err != nil {
if os.IsNotExist(err) {
return errors.New(errors.CodeNotFound, err.Error())
}
return errors.InternalWrapError(err)
}
if !sourceFi.Mode().IsRegular() && !sourceFi.IsDir() {
return errors.InternalErrorf("%s is not a regular file or directory", sourcePath)
}

if sourceFi.IsDir() {
return zipDir(sourcePath, zw)
}
return zipFile(sourcePath, zw)
}

func tarDir(sourcePath string, tw *tar.Writer) error {
baseName := filepath.Base(sourcePath)
count := 0
Expand Down Expand Up @@ -134,3 +159,61 @@ func tarFile(sourcePath string, tw *tar.Writer) error {
_, err = io.Copy(tw, f)
return err
}

func zipDir(sourcePath string, zw *zip.Writer) error {
baseName := filepath.Base(sourcePath)
count := 0
err := filepath.Walk(sourcePath, func(fpath string, info os.FileInfo, err error) error {
if err != nil {
return errors.InternalWrapError(err)
}
if info.IsDir() {
return nil
}
// build the name to be used in the archive
nameInArchive, err := filepath.Rel(sourcePath, fpath)
if err != nil {
return errors.InternalWrapError(err)
}
nameInArchive = filepath.Join(baseName, nameInArchive)
log.Infof("writing %s", nameInArchive)
count++

fileWriter, err := zw.Create(nameInArchive)
if err != nil {
return errors.InternalWrapError(err)
}
if !info.Mode().IsRegular() {
return nil
}
f, err := os.Open(filepath.Clean(fpath))
if err != nil {
return errors.InternalWrapError(err)
}
defer f.Close()

// copy file data into zip writer
_, err = io.Copy(fileWriter, f)
if err != nil {
return err
}

return nil
})
log.Infof("archive[zip] %d files/dirs in %s", count, sourcePath)
return err
}

func zipFile(sourcePath string, zw *zip.Writer) error {
f, err := os.Open(filepath.Clean(sourcePath))
if err != nil {
return errors.InternalWrapError(err)
}
defer util.Close(f)
fileWriter, err := zw.Create(sourcePath)
if err != nil {
return errors.InternalWrapError(err)
}
_, err = io.Copy(fileWriter, f)
return err
}
66 changes: 66 additions & 0 deletions util/archive/archive_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package archive

import (
"archive/zip"
"bufio"
"compress/gzip"
"crypto/rand"
Expand Down Expand Up @@ -139,3 +140,68 @@ func TestTarFile(t *testing.T) {
})
}
}

func TestZipDirectory(t *testing.T) {
tests := []struct {
name string
src string
wantErr bool
}{
{
"dir_missing",
"./fake/dir",
true,
},
{
"dir_common",
"../../test/e2e",
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f, err := tempFile(os.TempDir()+"/argo-test", "dir-"+tt.name+"-", ".tgz")
assert.NoError(t, err)

log.Infof("Zipping to %s", f.Name())

err = ZipToWriter(tt.src, zip.NewWriter(f))
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}

err = os.Remove(f.Name())
assert.NoError(t, err)

err = f.Close()
assert.NoError(t, err)
})
}
}

func TestZipFile(t *testing.T) {
t.Run("test_zip_file", func(t *testing.T) {
data, err := tempFile(os.TempDir()+"/argo-test", "file-random-", "")
assert.NoError(t, err)
_, err = data.WriteString("hello world")
assert.NoError(t, err)
err = data.Close()
assert.NoError(t, err)

dataZipPath := data.Name() + ".zip"
f, err := os.Create(dataZipPath)
assert.NoError(t, err)

err = ZipToWriter(data.Name(), zip.NewWriter(f))
assert.NoError(t, err)

err = os.Remove(data.Name())
assert.NoError(t, err)
err = f.Close()
assert.NoError(t, err)
err = os.Remove(f.Name())
assert.NoError(t, err)
})
}
32 changes: 32 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,22 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art
}
return fileName, mountedArtPath, nil
}
if strategy.Zip != nil {
fileName := fmt.Sprintf("%s.zip", art.Name)
localArtPath := filepath.Join(tempOutArtDir, fileName)
f, err := os.Create(localArtPath)
if err != nil {
return "", "", argoerrs.InternalWrapError(err)
}
zw := zip.NewWriter(f)
defer zw.Close()
err = archive.ZipToWriter(mountedArtPath, zw)
if err != nil {
return "", "", err
}
log.Infof("Successfully staged %s from mirrored volume mount %s", art.Path, mountedArtPath)
return fileName, localArtPath, nil
}
fileName := fmt.Sprintf("%s.tgz", art.Name)
localArtPath := filepath.Join(tempOutArtDir, fileName)
f, err := os.Create(localArtPath)
Expand Down Expand Up @@ -465,6 +481,22 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art
}
// In the future, if we were to support other compression formats (e.g. bzip2) or options
// the logic would go here, and compression would be moved out of the executors
if strategy.Zip != nil {
fileName = fmt.Sprintf("%s.zip", art.Name)
localArtPath = filepath.Join(tempOutArtDir, fileName)
f, err := os.Create(localArtPath)
if err != nil {
return "", "", argoerrs.InternalWrapError(err)
}
zw := zip.NewWriter(f)
defer zw.Close()
err = archive.ZipToWriter(unarchivedArtPath, zw)
if err != nil {
return "", "", err
}
log.Infof("Successfully zipped %s to %s", unarchivedArtPath, localArtPath)
return fileName, localArtPath, nil
}
return fileName, localArtPath, nil
}

Expand Down
97 changes: 84 additions & 13 deletions workflow/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,92 @@ func TestSaveArtifacts(t *testing.T) {
},
},
}
we := WorkflowExecutor{
PodName: fakePodName,
Template: templateWithOutParam,
ClientSet: fakeClientset,
Namespace: fakeNamespace,
RuntimeExecutor: &mockRuntimeExecutor,
templateOptionFalse := wfv1.Template{
Inputs: wfv1.Inputs{
Artifacts: []wfv1.Artifact{
{
Name: "samedir",
Path: "/samedir",
},
},
},
Outputs: wfv1.Outputs{
Artifacts: []wfv1.Artifact{
{
Name: "samedir",
Path: "/samedir",
Optional: false,
},
},
},
}
templateZipArchive := wfv1.Template{
Inputs: wfv1.Inputs{
Artifacts: []wfv1.Artifact{
{
Name: "samedir",
Path: "/samedir",
},
},
},
Outputs: wfv1.Outputs{
Artifacts: []wfv1.Artifact{
{
Name: "samedir",
Path: "/samedir",
Optional: true,
Archive: &wfv1.ArchiveStrategy{
Zip: &wfv1.ZipStrategy{},
},
},
},
},
}
tests := []struct {
workflowExecutor WorkflowExecutor
expectError bool
}{
{
workflowExecutor: WorkflowExecutor{
PodName: fakePodName,
Template: templateWithOutParam,
ClientSet: fakeClientset,
Namespace: fakeNamespace,
RuntimeExecutor: &mockRuntimeExecutor,
},
expectError: false,
},
{
workflowExecutor: WorkflowExecutor{
PodName: fakePodName,
Template: templateOptionFalse,
ClientSet: fakeClientset,
Namespace: fakeNamespace,
RuntimeExecutor: &mockRuntimeExecutor,
},
expectError: true,
},
{
workflowExecutor: WorkflowExecutor{
PodName: fakePodName,
Template: templateZipArchive,
ClientSet: fakeClientset,
Namespace: fakeNamespace,
RuntimeExecutor: &mockRuntimeExecutor,
},
expectError: false,
},
}

ctx := context.Background()
err := we.SaveArtifacts(ctx)
assert.NoError(t, err)

we.Template.Outputs.Artifacts[0].Optional = false
err = we.SaveArtifacts(ctx)
assert.Error(t, err)
for _, tt := range tests {
ctx := context.Background()
err := tt.workflowExecutor.SaveArtifacts(ctx)
if err != nil {
assert.Equal(t, tt.expectError, true)
continue
}
assert.Equal(t, tt.expectError, false)
}
}

func TestMonitorProgress(t *testing.T) {
Expand Down

0 comments on commit 55d15ae

Please sign in to comment.