Skip to content

Commit

Permalink
Fixes for kando chronicle push (#6235)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdmanv authored and Ilya Kislenko committed Aug 8, 2019
1 parent 8684b3e commit cafe75c
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 21 deletions.
19 changes: 11 additions & 8 deletions pkg/chronicle/chronicle_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p PushParams) Validate() error {
}

func Push(p PushParams) error {
log.Infof("%#v", p)
log.Debugf("%#v", p)
ctx := setupSignalHandler(context.Background())
var i int
for {
Expand Down Expand Up @@ -74,28 +74,31 @@ func push(ctx context.Context, p PushParams, ord int) error {
// Read profile.
prof, ok, err := readProfile(p.ProfilePath)
if !ok || err != nil {
return errors.Wrap(err, "")
return err
}

// Get envdir values if set.
var env []string
if p.EnvDir != "" {
var err error
envdir.EnvDir(p.EnvDir)
env, err = envdir.EnvDir(p.EnvDir)
if err != nil {
return err
}
}
log.Debugf("Pushing output from Command %d: %v. Environment: %v", ord, p.Command, env)
return pushWithEnv(ctx, p.Command, p.ArtifactPath, ord, prof, env)
}

func pushWithEnv(ctx context.Context, c []string, suffix string, ord int, prof param.Profile, env []string) error {
// Chronicle command w/ piped output.
cmd := exec.CommandContext(ctx, "sh", "-c", strings.Join(p.Command, " "))
cmd := exec.CommandContext(ctx, "sh", "-c", strings.Join(c, " "))
cmd.Env = append(cmd.Env, env...)
out, err := cmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "Failed to open command pipe")
}
cmd.Stderr = os.Stderr
cur := fmt.Sprintf("%s-%d", p.ArtifactPath, ord)
cur := fmt.Sprintf("%s-%d", suffix, ord)
// Write data to object store
if err := cmd.Start(); err != nil {
return errors.Wrap(err, "Failed to start chronicle pipe command")
Expand All @@ -109,11 +112,11 @@ func push(ctx context.Context, p PushParams, ord int) error {

// Write manifest pointing to new data
man := strings.NewReader(cur)
if err := location.Write(ctx, man, prof, p.ArtifactPath); err != nil {
if err := location.Write(ctx, man, prof, suffix); err != nil {
return errors.Wrap(err, "Failed to write command output to object storage")
}
// Delete old data
prev := fmt.Sprintf("%s-%d", p.ArtifactPath, ord-1)
prev := fmt.Sprintf("%s-%d", suffix, ord-1)
location.Delete(ctx, prof, prev)
return nil
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/chronicle/chronicle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,32 @@ import (

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/objectstore"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/testutil"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

type ChronicleSuite struct{}
type ChronicleSuite struct {
profile param.Profile
}

var _ = Suite(&ChronicleSuite{})

func (s *ChronicleSuite) TestPushPull(c *C) {
func (s *ChronicleSuite) SetUpSuite(c *C) {
osType := objectstore.ProviderTypeS3
loc := crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
Region: testutil.TestS3Region,
Bucket: testutil.TestS3BucketName,
}
prof := *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
s.profile = *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
}

func (s *ChronicleSuite) TestPushPull(c *C) {
pp := filepath.Join(c.MkDir(), "profile.json")
err := writeProfile(pp, prof)
err := writeProfile(pp, s.profile)
c.Assert(err, IsNil)

p := PushParams{
Expand All @@ -50,12 +56,29 @@ func (s *ChronicleSuite) TestPushPull(c *C) {

// Pull and check that we still get i
buf := bytes.NewBuffer(nil)
err = Pull(ctx, buf, prof, p.ArtifactPath)
err = Pull(ctx, buf, s.profile, p.ArtifactPath)
c.Assert(err, IsNil)
s, err := ioutil.ReadAll(buf)
str, err := ioutil.ReadAll(buf)
c.Assert(err, IsNil)
// Remove additional '\n'
t := strings.TrimSuffix(string(s), "\n")
t := strings.TrimSuffix(string(str), "\n")
c.Assert(t, Equals, strconv.Itoa(i))
}
}

func (s *ChronicleSuite) TestEnv(c *C) {
ctx := context.Background()
cmd := []string{"echo", "X:", "$X"}
suffix := c.TestName() + rand.String(5)
env := []string{"X=foo"}

err := pushWithEnv(ctx, cmd, suffix, 0, s.profile, env)
c.Assert(err, IsNil)
buf := bytes.NewBuffer(nil)
err = Pull(ctx, buf, s.profile, suffix)
c.Assert(err, IsNil)
str, err := ioutil.ReadAll(buf)
c.Assert(err, IsNil)
t := strings.TrimSuffix(string(str), "\n")
c.Assert(t, Equals, "X: foo")
}
2 changes: 1 addition & 1 deletion pkg/envdir/envdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func EnvDir(dir string) ([]string, error) {
}
e := make([]string, 0, len(fis))
for _, fi := range fis {
if fi.IsDir() {
if fi.IsDir() || fi.Mode()&os.ModeSymlink == os.ModeSymlink {
continue
}
p := filepath.Join(dir, fi.Name())
Expand Down
1 change: 1 addition & 0 deletions pkg/kando/chronicle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ func newChronicleCommand() *cobra.Command {
Short: "Manage periodic output streams in object storage",
}
cmd.AddCommand(newChroniclePushCommand())
cmd.AddCommand(newChroniclePullCommand())
return cmd
}
6 changes: 3 additions & 3 deletions pkg/kando/chronicle_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
func newChroniclePullCommand() *cobra.Command {
params := locationParams{}
cmd := &cobra.Command{
Use: "push <command>",
Short: "Periodically push the output of a command to object storage",
Use: "pull <command>",
Short: "Pull the data referenced by a chronicle manifest",
Args: cobra.ExactArgs(1),
RunE: func(c *cobra.Command, args []string) error {
return runChroniclePull(c, params, args[0])
},
}
cmd.PersistentFlags().StringVarP(&params.suffix, pathFlagName, "s", "", "Specify a path suffix (optional)")
cmd.PersistentFlags().StringVarP(&params.profile, pathFlagName, "p", "", "Pass a Profile as a JSON string (required)")
cmd.PersistentFlags().StringVarP(&params.profile, profileFlagName, "p", "", "Pass a Profile as a JSON string (required)")
cmd.MarkPersistentFlagRequired(profileFlagName)
return cmd
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kando/chronicle_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func newChroniclePushCommand() *cobra.Command {
return chronicle.Push(params)
},
}
cmd.PersistentFlags().StringVarP(&params.ProfilePath, profilePathFlagName, "s", "", "Specify a path suffix (optional)")
cmd.PersistentFlags().StringVarP(&params.ProfilePath, profilePathFlagName, "p", "", "Path to a Profile as a JSON string (required)")
cmd.MarkPersistentFlagRequired(profilePathFlagName)
cmd.PersistentFlags().StringVarP(&params.ArtifactPath, artifactPathFlagName, "p", "", "Path to a Profile as a JSON string (required)")
cmd.PersistentFlags().StringVarP(&params.ArtifactPath, artifactPathFlagName, "s", "", "Specify a path suffix (optional)")
cmd.PersistentFlags().StringVarP(&params.EnvDir, envDirFlagName, "e", "", "Get environment variables from a (optional)")
cmd.PersistentFlags().DurationVarP(&params.Frequency, frequencyFlagName, "f", time.Minute, "The Frequency to push to object storage ")
return cmd
Expand Down
17 changes: 17 additions & 0 deletions pkg/kando/kando.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kando
import (
"os"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

Expand All @@ -26,8 +27,24 @@ func newRootCommand() *cobra.Command {
Short: "A set of tools used from Kanister Blueprints",
Version: version.VersionString(),
}

var v string
rootCmd.PersistentFlags().StringVarP(&v, "verbosity", "v", log.WarnLevel.String(), "Log level (debug, info, warn, error, fatal, panic)")
rootCmd.PersistentPreRunE = func(*cobra.Command, []string) error {
return setLogLevel(v)
}

rootCmd.AddCommand(newLocationCommand())
rootCmd.AddCommand(newOutputCommand())
rootCmd.AddCommand(newChronicleCommand())
return rootCmd
}

func setLogLevel(v string) error {
l, err := log.ParseLevel(v)
if err != nil {
return errors.Wrap(err, "Invalid log level: "+v)
}
log.SetLevel(l)
return nil
}

0 comments on commit cafe75c

Please sign in to comment.