From 851370db8122f4c5b5d4850397e45e4ff1d68a9e Mon Sep 17 00:00:00 2001 From: Michael Smith Date: Tue, 14 Apr 2020 15:05:15 -0700 Subject: [PATCH] Implement Writable for the Volume plugin Volumes now support writing content to files. Resolves #675. Signed-off-by: Michael Smith --- plugin/docker/volume.go | 45 ++++++++++++++++++++++++++++++++++++---- plugin/kubernetes/pvc.go | 26 +++++++++++++++-------- volume/core.go | 4 ++++ volume/dir_test.go | 9 ++++++-- volume/file.go | 10 +++++++++ volume/file_test.go | 17 +++++++++++++++ volume/fs.go | 39 ++++++++++++++++++++++++++++++++++ volume/fs_test.go | 38 ++++++++++++++++++++++++++------- 8 files changed, 165 insertions(+), 23 deletions(-) diff --git a/plugin/docker/volume.go b/plugin/docker/volume.go index b95dad4cf..15bff6512 100644 --- a/plugin/docker/volume.go +++ b/plugin/docker/volume.go @@ -7,6 +7,8 @@ import ( "errors" "io" "io/ioutil" + "os" + "path/filepath" "strings" "time" @@ -75,10 +77,9 @@ func (v *volume) createContainer(ctx context.Context, cmd []string) (string, fun // Use tty to avoid messing with the extra log formatting. cfg := docontainer.Config{Image: "busybox", Cmd: cmd, Tty: true} mounts := []mount.Mount{{ - Type: mount.TypeVolume, - Source: v.Name(), - Target: mountpoint, - ReadOnly: true, + Type: mount.TypeVolume, + Source: v.Name(), + Target: mountpoint, }} hostcfg := docontainer.HostConfig{Mounts: mounts} netcfg := network.NetworkingConfig{} @@ -232,6 +233,42 @@ func (v *volume) VolumeStream(ctx context.Context, path string) (io.ReadCloser, return plugin.CleanupReader{ReadCloser: output, Cleanup: cleanup}, nil } +func (v *volume) VolumeWrite(ctx context.Context, path string, b []byte, mode os.FileMode) error { + // Create a container that mounts a volume and waits. Use it to upload a file. + cid, cleanup, err := v.createContainer(ctx, []string{"sleep", "60"}) + if err != nil { + return err + } + defer cleanup() + + // Create a tar of the file contents and upload it. + dir, file := filepath.Split(path) + var buf bytes.Buffer + tw := tar.NewWriter(&buf) + mtime := time.Now() + hdr := tar.Header{ + Name: file, + Size: int64(len(b)), + Mode: int64(mode), + // Use PAX format to ensure compatibility with non-ASCII filenames. + Format: tar.FormatPAX, + // Use of PAX requires we set atime/ctime/mtime. Use now, we just read the file to update it. + AccessTime: mtime, + ChangeTime: mtime, + ModTime: mtime, + } + + if err := tw.WriteHeader(&hdr); err != nil { + return err + } else if _, err := tw.Write(b); err != nil { + return err + } else if err := tw.Close(); err != nil { + return err + } + + return v.client.CopyToContainer(ctx, cid, mountpoint+dir, &buf, types.CopyToContainerOptions{}) +} + func (v *volume) VolumeDelete(ctx context.Context, path string) (bool, error) { _, err := v.runInTemporaryContainer(ctx, []string{"rm", "-rf", mountpoint + path}) if err != nil { diff --git a/plugin/kubernetes/pvc.go b/plugin/kubernetes/pvc.go index 44b0ec02e..f8ba14af8 100644 --- a/plugin/kubernetes/pvc.go +++ b/plugin/kubernetes/pvc.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "os" "github.com/puppetlabs/wash/activity" "github.com/puppetlabs/wash/plugin" @@ -156,7 +157,7 @@ func (v *pvc) inContainer(ctx context.Context, fn containerCb) (interface{}, err // to inject a base path. type cmdBuilder func(string) []string -func (v *pvc) exec(ctx context.Context, buildCmd cmdBuilder) ([]byte, error) { +func (v *pvc) exec(ctx context.Context, buildCmd cmdBuilder, stdin io.Reader) ([]byte, error) { obj, err := v.inContainer(ctx, func(c *containerBase, mountpoint string, cleanup func()) (interface{}, error) { defer cleanup() @@ -164,7 +165,7 @@ func (v *pvc) exec(ctx context.Context, buildCmd cmdBuilder) ([]byte, error) { activity.Record(ctx, "Executing in %v: %v", c, cmd) var stdout, stderr bytes.Buffer - streamOpts := remotecommand.StreamOptions{Stdout: &stdout, Stderr: &stderr} + streamOpts := remotecommand.StreamOptions{Stdout: &stdout, Stderr: &stderr, Stdin: stdin} executor, err := c.newExecutor(ctx, cmd[0], cmd[1:], streamOpts) if err != nil { return []byte{}, err @@ -185,7 +186,7 @@ func (v *pvc) VolumeList(ctx context.Context, path string) (volume.DirMap, error output, err := v.exec(ctx, func(base string) []string { mountpoint = base return volume.StatCmdPOSIX(base+path, maxdepth) - }) + }, nil) if err != nil { return nil, err @@ -196,7 +197,7 @@ func (v *pvc) VolumeList(ctx context.Context, path string) (volume.DirMap, error func (v *pvc) VolumeRead(ctx context.Context, path string) ([]byte, error) { output, err := v.exec(ctx, func(base string) []string { return []string{"cat", base + path} - }) + }, nil) if err != nil { return nil, err } @@ -228,10 +229,17 @@ func (v *pvc) VolumeStream(ctx context.Context, path string) (io.ReadCloser, err return obj.(io.ReadCloser), err } +func (v *pvc) VolumeWrite(ctx context.Context, path string, b []byte, _ os.FileMode) error { + _, err := v.exec(ctx, func(base string) []string { + return []string{"cp", "/dev/stdin", base + path} + }, bytes.NewReader(b)) + return err +} + func (v *pvc) VolumeDelete(ctx context.Context, path string) (bool, error) { _, err := v.exec(ctx, func(base string) []string { return []string{"rm", "-rf", base + path} - }) + }, nil) if err != nil { return false, err } @@ -240,8 +248,8 @@ func (v *pvc) VolumeDelete(ctx context.Context, path string) (bool, error) { const pvcDescription = ` This is a Kubernetes persistent volume claim. We create a temporary Kubernetes -pod whenever Wash invokes a currently uncached List/Read/Stream action on it or -one of its children. For List, we run 'find -exec stat' on the pod and parse its -output. For Read, we run 'cat' and return its output. For Stream, we run 'tail -f' -and stream its output. +pod whenever Wash invokes a currently uncached List/Read/Stream/Write action on +it or one of its children. For List, we run 'find -exec stat' on the pod and +parse its output. For Read, we run 'cat' and return its output. For Stream, we +run 'tail -f' and stream its output. ` diff --git a/volume/core.go b/volume/core.go index 4659e1c52..8f39144e9 100644 --- a/volume/core.go +++ b/volume/core.go @@ -9,6 +9,7 @@ package volume import ( "context" "io" + "os" "strings" "sync" "time" @@ -31,6 +32,9 @@ type Interface interface { VolumeRead(ctx context.Context, path string) ([]byte, error) // Accepts a path and streams updates to the content associated with that path. VolumeStream(ctx context.Context, path string) (io.ReadCloser, error) + // Accepts a path and content and writes it to the file associated with that path. + // Mode is provided for Write operations that replace the entire file. + VolumeWrite(ctx context.Context, path string, b []byte, m os.FileMode) error // Deletes the volume node at the specified path. Mirrors plugin.Deletable#Delete VolumeDelete(ctx context.Context, path string) (bool, error) } diff --git a/volume/dir_test.go b/volume/dir_test.go index 2e3fc9d41..67dbf1c9c 100644 --- a/volume/dir_test.go +++ b/volume/dir_test.go @@ -5,6 +5,7 @@ import ( "io" "os" "strings" + "syscall" "testing" "github.com/puppetlabs/wash/datastore" @@ -24,11 +25,15 @@ func (m *mockDirEntry) VolumeList(ctx context.Context, path string) (DirMap, err } func (m *mockDirEntry) VolumeRead(context.Context, string) ([]byte, error) { - return nil, nil + return nil, syscall.ENOTSUP } func (m *mockDirEntry) VolumeStream(context.Context, string) (io.ReadCloser, error) { - return nil, nil + return nil, syscall.ENOTSUP +} + +func (m *mockDirEntry) VolumeWrite(context.Context, string, []byte, os.FileMode) error { + return syscall.ENOTSUP } func (m *mockDirEntry) VolumeDelete(ctx context.Context, path string) (bool, error) { diff --git a/volume/file.go b/volume/file.go index 331af7af9..c636cd89a 100644 --- a/volume/file.go +++ b/volume/file.go @@ -3,6 +3,7 @@ package volume import ( "context" "io" + "os" "time" "github.com/puppetlabs/wash/plugin" @@ -42,6 +43,15 @@ func (v *file) Stream(ctx context.Context) (io.ReadCloser, error) { return v.impl.VolumeStream(ctx, v.path) } +func (v *file) Write(ctx context.Context, b []byte) error { + // Pass mode for Write operations that replace the file. + mode := os.FileMode(0640) + if v.Attributes().HasMode() { + mode = v.Attributes().Mode() + } + return v.impl.VolumeWrite(ctx, v.path, b, mode) +} + func (v *file) Delete(ctx context.Context) (bool, error) { return deleteNode(ctx, v.impl, v.path, v.dirmap) } diff --git a/volume/file_test.go b/volume/file_test.go index cb1a56c69..4bf008338 100644 --- a/volume/file_test.go +++ b/volume/file_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "io/ioutil" + "os" "strings" "testing" "time" @@ -37,6 +38,14 @@ func (m *mockFileEntry) VolumeStream(context.Context, string) (io.ReadCloser, er return ioutil.NopCloser(strings.NewReader(m.content)), nil } +func (m *mockFileEntry) VolumeWrite(_ context.Context, _ string, b []byte, _ os.FileMode) error { + if m.err != nil { + return m.err + } + m.content = string(b) + return nil +} + func (m *mockFileEntry) VolumeDelete(context.Context, string) (bool, error) { return true, nil } @@ -70,6 +79,11 @@ func TestVolumeFile(t *testing.T) { assert.Equal(t, "hello", string(buf)) } } + + text := "some text" + err = vf.Write(context.Background(), []byte(text)) + assert.NoError(t, err) + assert.Equal(t, text, impl.content) } func TestVolumeFileErr(t *testing.T) { @@ -83,4 +97,7 @@ func TestVolumeFileErr(t *testing.T) { rdr, err := plugin.Stream(context.Background(), vf) assert.Nil(t, rdr) assert.Equal(t, errors.New("fail"), err) + + err = vf.Write(context.Background(), []byte{'a'}) + assert.Equal(t, errors.New("fail"), err) } diff --git a/volume/fs.go b/volume/fs.go index 16064586c..ea00b7016 100644 --- a/volume/fs.go +++ b/volume/fs.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "os" "strings" "github.com/puppetlabs/wash/activity" @@ -202,6 +203,44 @@ func (d *FS) VolumeStream(ctx context.Context, path string) (io.ReadCloser, erro return r, nil } +// VolumeWrite satisfies the Interface required by Write to write content to a file. +func (d *FS) VolumeWrite(ctx context.Context, path string, b []byte, _ os.FileMode) error { + activity.Record(ctx, "Writing %v bytes to %v on %v", len(b), path, plugin.ID(d.executor)) + command := d.selectShellCommand([]string{"cp", "/dev/stdin", path}, []string{"$input | Set-Content '" + path + "'"}) + + // Don't use Tty when writing file content because it may convert LF to CRLF. + // Use Elevate because it's common to login to systems as a non-root user and sudo. + opts := plugin.ExecOptions{Elevate: true, Stdin: bytes.NewReader(b)} + cmd, err := plugin.Exec(ctx, d.executor, command[0], command[1:], opts) + if err != nil { + return err + } + + var output bytes.Buffer + var errs []error + for chunk := range cmd.OutputCh() { + if chunk.Err != nil { + errs = append(errs, chunk.Err) + } else { + activity.Record(ctx, "%v: %v", chunk.StreamID, chunk.Data) + fmt.Fprint(&output, chunk.Data) + } + } + + if len(errs) > 0 { + return fmt.Errorf("Exec errors on %v in VolumeWrite: %v", path, errs) + } + + exitcode, err := cmd.ExitCode() + if err != nil { + return fmt.Errorf("Exec error on %v in VolumeWrite: %v", path, err) + } else if exitcode != 0 { + // Can happen due to permission denied. Leave handling up to the caller. + return nonZeroError{cmdline: command, stderr: strings.TrimSpace(output.String()), exitcode: exitcode} + } + return nil +} + // VolumeDelete satisfies the Interface required by Delete to delete volume nodes. func (d *FS) VolumeDelete(ctx context.Context, path string) (bool, error) { activity.Record(ctx, "Deleting %v on %v", path, plugin.ID(d.executor)) diff --git a/volume/fs_test.go b/volume/fs_test.go index a3e475a17..ef2b77e2d 100644 --- a/volume/fs_test.go +++ b/volume/fs_test.go @@ -1,6 +1,7 @@ package volume import ( + "bytes" "context" "fmt" "sort" @@ -69,14 +70,14 @@ const ( type fsTestSuite struct { suite.Suite - ctx context.Context - cancelFunc context.CancelFunc - loginShell plugin.Shell - statCmd func(path string, maxdepth int) []string - outputFixture string - outputDepth int - shortFixture, deepFixture string - readCmdFn, deleteCmdFn func(path string) (command []string) + ctx context.Context + cancelFunc context.CancelFunc + loginShell plugin.Shell + statCmd func(path string, maxdepth int) []string + outputFixture string + outputDepth int + shortFixture, deepFixture string + readCmdFn, writeCmdFn, deleteCmdFn func(path string) (command []string) } func (suite *fsTestSuite) SetupTest() { @@ -249,6 +250,25 @@ func (suite *fsTestSuite) TestFSRead() { exec.AssertExpectations(suite.T()) } +func (suite *fsTestSuite) TestFSWrite() { + exec := suite.createExec() + exec.onExec(suite.statCmd("/", suite.outputDepth), suite.createResult(suite.outputFixture)) + + fs := NewFS(suite.ctx, "fs", exec, suite.outputDepth) + + entry := suite.find(fs, "var/log/path1/a file") + suite.Equal("a file", plugin.Name(entry)) + + data := []byte("data") + cmd := suite.writeCmdFn("/var/log/path1/a file") + opts := plugin.ExecOptions{Elevate: true, Stdin: bytes.NewReader(data)} + exec.On("Exec", mock.Anything, cmd[0], cmd[1:], opts).Return(suite.createResult(""), nil) + + err := entry.(plugin.Writable).Write(suite.ctx, data) + suite.NoError(err) + exec.AssertExpectations(suite.T()) +} + func (suite *fsTestSuite) TestVolumeDelete() { exec := suite.createExec() exec.onExec(suite.statCmd("/", suite.outputDepth), suite.createResult(suite.outputFixture)) @@ -270,6 +290,7 @@ func TestPOSIXFS(t *testing.T) { shortFixture: posixFixtureShort, deepFixture: posixFixtureDeep, readCmdFn: func(path string) []string { return []string{"cat", path} }, + writeCmdFn: func(path string) []string { return []string{"cp", "/dev/stdin", path} }, deleteCmdFn: func(path string) []string { return []string{"rm", "-rf", path} }, }) } @@ -283,6 +304,7 @@ func TestPowershellFS(t *testing.T) { shortFixture: powershellFixtureShort, deepFixture: powershellFixtureDeep, readCmdFn: func(path string) []string { return []string{"Get-Content '" + path + "'"} }, + writeCmdFn: func(path string) []string { return []string{"$input | Set-Content '" + path + "'"} }, deleteCmdFn: func(path string) []string { return []string{"Remove-Item -Recurse -Force '" + path + "'"} }, }) }