Skip to content

Commit

Permalink
Implement Writable for the Volume plugin
Browse files Browse the repository at this point in the history
Volumes now support writing content to files.

Resolves puppetlabs-toy-chest#675.

Signed-off-by: Michael Smith <michael.smith@puppet.com>
  • Loading branch information
MikaelSmith committed Apr 16, 2020
1 parent 80d3675 commit 851370d
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 23 deletions.
45 changes: 41 additions & 4 deletions plugin/docker/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -75,10 +77,9 @@ func (v *volume) createContainer(ctx context.Context, cmd []string) (string, fun
// Use tty to avoid messing with the extra log formatting.
cfg := docontainer.Config{Image: "busybox", Cmd: cmd, Tty: true}
mounts := []mount.Mount{{
Type: mount.TypeVolume,
Source: v.Name(),
Target: mountpoint,
ReadOnly: true,
Type: mount.TypeVolume,
Source: v.Name(),
Target: mountpoint,
}}
hostcfg := docontainer.HostConfig{Mounts: mounts}
netcfg := network.NetworkingConfig{}
Expand Down Expand Up @@ -232,6 +233,42 @@ func (v *volume) VolumeStream(ctx context.Context, path string) (io.ReadCloser,
return plugin.CleanupReader{ReadCloser: output, Cleanup: cleanup}, nil
}

func (v *volume) VolumeWrite(ctx context.Context, path string, b []byte, mode os.FileMode) error {
// Create a container that mounts a volume and waits. Use it to upload a file.
cid, cleanup, err := v.createContainer(ctx, []string{"sleep", "60"})
if err != nil {
return err
}
defer cleanup()

// Create a tar of the file contents and upload it.
dir, file := filepath.Split(path)
var buf bytes.Buffer
tw := tar.NewWriter(&buf)
mtime := time.Now()
hdr := tar.Header{
Name: file,
Size: int64(len(b)),
Mode: int64(mode),
// Use PAX format to ensure compatibility with non-ASCII filenames.
Format: tar.FormatPAX,
// Use of PAX requires we set atime/ctime/mtime. Use now, we just read the file to update it.
AccessTime: mtime,
ChangeTime: mtime,
ModTime: mtime,
}

if err := tw.WriteHeader(&hdr); err != nil {
return err
} else if _, err := tw.Write(b); err != nil {
return err
} else if err := tw.Close(); err != nil {
return err
}

return v.client.CopyToContainer(ctx, cid, mountpoint+dir, &buf, types.CopyToContainerOptions{})
}

func (v *volume) VolumeDelete(ctx context.Context, path string) (bool, error) {
_, err := v.runInTemporaryContainer(ctx, []string{"rm", "-rf", mountpoint + path})
if err != nil {
Expand Down
26 changes: 17 additions & 9 deletions plugin/kubernetes/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
"os"

"github.com/puppetlabs/wash/activity"
"github.com/puppetlabs/wash/plugin"
Expand Down Expand Up @@ -156,15 +157,15 @@ func (v *pvc) inContainer(ctx context.Context, fn containerCb) (interface{}, err
// to inject a base path.
type cmdBuilder func(string) []string

func (v *pvc) exec(ctx context.Context, buildCmd cmdBuilder) ([]byte, error) {
func (v *pvc) exec(ctx context.Context, buildCmd cmdBuilder, stdin io.Reader) ([]byte, error) {
obj, err := v.inContainer(ctx, func(c *containerBase, mountpoint string, cleanup func()) (interface{}, error) {
defer cleanup()

cmd := buildCmd(mountpoint)
activity.Record(ctx, "Executing in %v: %v", c, cmd)

var stdout, stderr bytes.Buffer
streamOpts := remotecommand.StreamOptions{Stdout: &stdout, Stderr: &stderr}
streamOpts := remotecommand.StreamOptions{Stdout: &stdout, Stderr: &stderr, Stdin: stdin}
executor, err := c.newExecutor(ctx, cmd[0], cmd[1:], streamOpts)
if err != nil {
return []byte{}, err
Expand All @@ -185,7 +186,7 @@ func (v *pvc) VolumeList(ctx context.Context, path string) (volume.DirMap, error
output, err := v.exec(ctx, func(base string) []string {
mountpoint = base
return volume.StatCmdPOSIX(base+path, maxdepth)
})
}, nil)

if err != nil {
return nil, err
Expand All @@ -196,7 +197,7 @@ func (v *pvc) VolumeList(ctx context.Context, path string) (volume.DirMap, error
func (v *pvc) VolumeRead(ctx context.Context, path string) ([]byte, error) {
output, err := v.exec(ctx, func(base string) []string {
return []string{"cat", base + path}
})
}, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -228,10 +229,17 @@ func (v *pvc) VolumeStream(ctx context.Context, path string) (io.ReadCloser, err
return obj.(io.ReadCloser), err
}

func (v *pvc) VolumeWrite(ctx context.Context, path string, b []byte, _ os.FileMode) error {
_, err := v.exec(ctx, func(base string) []string {
return []string{"cp", "/dev/stdin", base + path}
}, bytes.NewReader(b))
return err
}

func (v *pvc) VolumeDelete(ctx context.Context, path string) (bool, error) {
_, err := v.exec(ctx, func(base string) []string {
return []string{"rm", "-rf", base + path}
})
}, nil)
if err != nil {
return false, err
}
Expand All @@ -240,8 +248,8 @@ func (v *pvc) VolumeDelete(ctx context.Context, path string) (bool, error) {

const pvcDescription = `
This is a Kubernetes persistent volume claim. We create a temporary Kubernetes
pod whenever Wash invokes a currently uncached List/Read/Stream action on it or
one of its children. For List, we run 'find -exec stat' on the pod and parse its
output. For Read, we run 'cat' and return its output. For Stream, we run 'tail -f'
and stream its output.
pod whenever Wash invokes a currently uncached List/Read/Stream/Write action on
it or one of its children. For List, we run 'find -exec stat' on the pod and
parse its output. For Read, we run 'cat' and return its output. For Stream, we
run 'tail -f' and stream its output.
`
4 changes: 4 additions & 0 deletions volume/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package volume
import (
"context"
"io"
"os"
"strings"
"sync"
"time"
Expand All @@ -31,6 +32,9 @@ type Interface interface {
VolumeRead(ctx context.Context, path string) ([]byte, error)
// Accepts a path and streams updates to the content associated with that path.
VolumeStream(ctx context.Context, path string) (io.ReadCloser, error)
// Accepts a path and content and writes it to the file associated with that path.
// Mode is provided for Write operations that replace the entire file.
VolumeWrite(ctx context.Context, path string, b []byte, m os.FileMode) error
// Deletes the volume node at the specified path. Mirrors plugin.Deletable#Delete
VolumeDelete(ctx context.Context, path string) (bool, error)
}
Expand Down
9 changes: 7 additions & 2 deletions volume/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"
"strings"
"syscall"
"testing"

"github.com/puppetlabs/wash/datastore"
Expand All @@ -24,11 +25,15 @@ func (m *mockDirEntry) VolumeList(ctx context.Context, path string) (DirMap, err
}

func (m *mockDirEntry) VolumeRead(context.Context, string) ([]byte, error) {
return nil, nil
return nil, syscall.ENOTSUP
}

func (m *mockDirEntry) VolumeStream(context.Context, string) (io.ReadCloser, error) {
return nil, nil
return nil, syscall.ENOTSUP
}

func (m *mockDirEntry) VolumeWrite(context.Context, string, []byte, os.FileMode) error {
return syscall.ENOTSUP
}

func (m *mockDirEntry) VolumeDelete(ctx context.Context, path string) (bool, error) {
Expand Down
10 changes: 10 additions & 0 deletions volume/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package volume
import (
"context"
"io"
"os"
"time"

"github.com/puppetlabs/wash/plugin"
Expand Down Expand Up @@ -42,6 +43,15 @@ func (v *file) Stream(ctx context.Context) (io.ReadCloser, error) {
return v.impl.VolumeStream(ctx, v.path)
}

func (v *file) Write(ctx context.Context, b []byte) error {
// Pass mode for Write operations that replace the file.
mode := os.FileMode(0640)
if v.Attributes().HasMode() {
mode = v.Attributes().Mode()
}
return v.impl.VolumeWrite(ctx, v.path, b, mode)
}

func (v *file) Delete(ctx context.Context) (bool, error) {
return deleteNode(ctx, v.impl, v.path, v.dirmap)
}
Expand Down
17 changes: 17 additions & 0 deletions volume/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"io/ioutil"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -37,6 +38,14 @@ func (m *mockFileEntry) VolumeStream(context.Context, string) (io.ReadCloser, er
return ioutil.NopCloser(strings.NewReader(m.content)), nil
}

func (m *mockFileEntry) VolumeWrite(_ context.Context, _ string, b []byte, _ os.FileMode) error {
if m.err != nil {
return m.err
}
m.content = string(b)
return nil
}

func (m *mockFileEntry) VolumeDelete(context.Context, string) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -70,6 +79,11 @@ func TestVolumeFile(t *testing.T) {
assert.Equal(t, "hello", string(buf))
}
}

text := "some text"
err = vf.Write(context.Background(), []byte(text))
assert.NoError(t, err)
assert.Equal(t, text, impl.content)
}

func TestVolumeFileErr(t *testing.T) {
Expand All @@ -83,4 +97,7 @@ func TestVolumeFileErr(t *testing.T) {
rdr, err := plugin.Stream(context.Background(), vf)
assert.Nil(t, rdr)
assert.Equal(t, errors.New("fail"), err)

err = vf.Write(context.Background(), []byte{'a'})
assert.Equal(t, errors.New("fail"), err)
}
39 changes: 39 additions & 0 deletions volume/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"os"
"strings"

"github.com/puppetlabs/wash/activity"
Expand Down Expand Up @@ -202,6 +203,44 @@ func (d *FS) VolumeStream(ctx context.Context, path string) (io.ReadCloser, erro
return r, nil
}

// VolumeWrite satisfies the Interface required by Write to write content to a file.
func (d *FS) VolumeWrite(ctx context.Context, path string, b []byte, _ os.FileMode) error {
activity.Record(ctx, "Writing %v bytes to %v on %v", len(b), path, plugin.ID(d.executor))
command := d.selectShellCommand([]string{"cp", "/dev/stdin", path}, []string{"$input | Set-Content '" + path + "'"})

// Don't use Tty when writing file content because it may convert LF to CRLF.
// Use Elevate because it's common to login to systems as a non-root user and sudo.
opts := plugin.ExecOptions{Elevate: true, Stdin: bytes.NewReader(b)}
cmd, err := plugin.Exec(ctx, d.executor, command[0], command[1:], opts)
if err != nil {
return err
}

var output bytes.Buffer
var errs []error
for chunk := range cmd.OutputCh() {
if chunk.Err != nil {
errs = append(errs, chunk.Err)
} else {
activity.Record(ctx, "%v: %v", chunk.StreamID, chunk.Data)
fmt.Fprint(&output, chunk.Data)
}
}

if len(errs) > 0 {
return fmt.Errorf("Exec errors on %v in VolumeWrite: %v", path, errs)
}

exitcode, err := cmd.ExitCode()
if err != nil {
return fmt.Errorf("Exec error on %v in VolumeWrite: %v", path, err)
} else if exitcode != 0 {
// Can happen due to permission denied. Leave handling up to the caller.
return nonZeroError{cmdline: command, stderr: strings.TrimSpace(output.String()), exitcode: exitcode}
}
return nil
}

// VolumeDelete satisfies the Interface required by Delete to delete volume nodes.
func (d *FS) VolumeDelete(ctx context.Context, path string) (bool, error) {
activity.Record(ctx, "Deleting %v on %v", path, plugin.ID(d.executor))
Expand Down
38 changes: 30 additions & 8 deletions volume/fs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package volume

import (
"bytes"
"context"
"fmt"
"sort"
Expand Down Expand Up @@ -69,14 +70,14 @@ const (

type fsTestSuite struct {
suite.Suite
ctx context.Context
cancelFunc context.CancelFunc
loginShell plugin.Shell
statCmd func(path string, maxdepth int) []string
outputFixture string
outputDepth int
shortFixture, deepFixture string
readCmdFn, deleteCmdFn func(path string) (command []string)
ctx context.Context
cancelFunc context.CancelFunc
loginShell plugin.Shell
statCmd func(path string, maxdepth int) []string
outputFixture string
outputDepth int
shortFixture, deepFixture string
readCmdFn, writeCmdFn, deleteCmdFn func(path string) (command []string)
}

func (suite *fsTestSuite) SetupTest() {
Expand Down Expand Up @@ -249,6 +250,25 @@ func (suite *fsTestSuite) TestFSRead() {
exec.AssertExpectations(suite.T())
}

func (suite *fsTestSuite) TestFSWrite() {
exec := suite.createExec()
exec.onExec(suite.statCmd("/", suite.outputDepth), suite.createResult(suite.outputFixture))

fs := NewFS(suite.ctx, "fs", exec, suite.outputDepth)

entry := suite.find(fs, "var/log/path1/a file")
suite.Equal("a file", plugin.Name(entry))

data := []byte("data")
cmd := suite.writeCmdFn("/var/log/path1/a file")
opts := plugin.ExecOptions{Elevate: true, Stdin: bytes.NewReader(data)}
exec.On("Exec", mock.Anything, cmd[0], cmd[1:], opts).Return(suite.createResult(""), nil)

err := entry.(plugin.Writable).Write(suite.ctx, data)
suite.NoError(err)
exec.AssertExpectations(suite.T())
}

func (suite *fsTestSuite) TestVolumeDelete() {
exec := suite.createExec()
exec.onExec(suite.statCmd("/", suite.outputDepth), suite.createResult(suite.outputFixture))
Expand All @@ -270,6 +290,7 @@ func TestPOSIXFS(t *testing.T) {
shortFixture: posixFixtureShort,
deepFixture: posixFixtureDeep,
readCmdFn: func(path string) []string { return []string{"cat", path} },
writeCmdFn: func(path string) []string { return []string{"cp", "/dev/stdin", path} },
deleteCmdFn: func(path string) []string { return []string{"rm", "-rf", path} },
})
}
Expand All @@ -283,6 +304,7 @@ func TestPowershellFS(t *testing.T) {
shortFixture: powershellFixtureShort,
deepFixture: powershellFixtureDeep,
readCmdFn: func(path string) []string { return []string{"Get-Content '" + path + "'"} },
writeCmdFn: func(path string) []string { return []string{"$input | Set-Content '" + path + "'"} },
deleteCmdFn: func(path string) []string { return []string{"Remove-Item -Recurse -Force '" + path + "'"} },
})
}
Expand Down

0 comments on commit 851370d

Please sign in to comment.