Skip to content

Commit

Permalink
Merge pull request #189 from kanisterio/sync
Browse files Browse the repository at this point in the history
Add Chronicle CLI; RestoreData in parallel; Add Kind to Makefile
  • Loading branch information
SupriyaKasten committed Aug 6, 2019
2 parents 2dc827a + bfae54f commit ed7af31
Show file tree
Hide file tree
Showing 21 changed files with 776 additions and 128 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ push-name:
version:
@echo $(VERSION)

.PHONY: deploy test codegen build-dirs run clean container-clean bin-clean vendor-clean docs
.PHONY: deploy test codegen build-dirs run clean container-clean bin-clean vendor-clean docs start-kind stop-kind

deploy: release-controller .deploy-$(DOTFILE_IMAGE)
.deploy-$(DOTFILE_IMAGE):
Expand Down Expand Up @@ -201,6 +201,7 @@ ifeq ($(DOCKER_BUILD),"true")
-v "$(PWD):/go/src/$(PKG)" \
-v "$(PWD)/bin/$(ARCH)/$$(go env GOOS)_$(ARCH):/go/bin" \
-v "$(PWD)/.go/std/$(ARCH):/usr/local/go/pkg/linux_$(ARCH)_static" \
-v /var/run/docker.sock:/var/run/docker.sock \
-w /go/src/$(PKG) \
$(BUILD_IMAGE) \
/bin/sh $(CMD)
Expand Down Expand Up @@ -235,3 +236,9 @@ release-helm:

release-kanctl:
@$(MAKE) run CMD='-c "./build/release_kanctl.sh"'

start-kind:
@$(MAKE) run CMD='-c "./build/local_kubernetes.sh start_localkube"'

stop-kind:
@$(MAKE) run CMD='-c "./build/local_kubernetes.sh stop_localkube"'
4 changes: 2 additions & 2 deletions build/local_kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export KUBECONFIG=$HOME/.kube/config
export KUBE_VERSION=${KUBE_VERSION:-"v1.13.7"}
export KIND_VERSION=${KIND_VERSION:-"v0.4.0"}
export LOCAL_CLUSTER_NAME=${LOCAL_CLUSTER_NAME:-"kanister"}
declare -a REQUIRED_BINS=( docker sudo jq go )
declare -a REQUIRED_BINS=( docker jq go )

if command -v apt-get
then
Expand Down Expand Up @@ -65,7 +65,6 @@ stop_localkube() {
}

get_localkube() {
check_or_get_dependencies
mkdir $HOME/.kube || true
touch $HOME/.kube/config
GO111MODULE="on" go get sigs.k8s.io/kind@${KIND_VERSION}
Expand Down Expand Up @@ -128,6 +127,7 @@ EOM
}

[ ${#@} -gt 0 ] || usage
check_or_get_dependencies
case "${1}" in
# Alphabetically sorted
get_localkube)
Expand Down
21 changes: 0 additions & 21 deletions pkg/chronicle/chronicle.go

This file was deleted.

24 changes: 24 additions & 0 deletions pkg/chronicle/chronicle_pull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package chronicle

import (
"bytes"
"context"
"io"
"io/ioutil"

"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
"github.com/pkg/errors"
)

func Pull(ctx context.Context, target io.Writer, p param.Profile, manifest string) error {
// Read manifest
buf := bytes.NewBuffer(nil)
location.Read(ctx, buf, p, manifest)
// Read Data
data, err := ioutil.ReadAll(buf)
if err != nil {
return errors.Wrap(err, "Could not read chronicle manifest")
}
return location.Read(ctx, target, p, string(data))
}
147 changes: 147 additions & 0 deletions pkg/chronicle/chronicle_push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package chronicle

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/kanisterio/kanister/pkg/envdir"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

type PushParams struct {
ProfilePath string
ArtifactPath string
Frequency time.Duration
EnvDir string
Command []string
}

func (p PushParams) Validate() error {
return nil
}

func Push(p PushParams) error {
log.Infof("%#v", p)
ctx := setupSignalHandler(context.Background())
var i int
for {
start := time.Now().UTC()

if err := push(ctx, p, i); err != nil {
return err
}

end := time.Now().UTC()
sleep := p.Frequency - end.Sub(start)
select {
case <-ctx.Done():
return nil
case <-time.After(sleep):
}
i++
}
}

func setupSignalHandler(ctx context.Context) context.Context {
var can context.CancelFunc
ctx, can = context.WithCancel(ctx)
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
log.Infof("Shutting down process")
can()
<-c
log.Infof("Killing process")
os.Exit(1)
}()
return ctx
}

func push(ctx context.Context, p PushParams, ord int) error {
// Read profile.
prof, ok, err := readProfile(p.ProfilePath)
if !ok || err != nil {
return errors.Wrap(err, "")
}

// Get envdir values if set.
var env []string
if p.EnvDir != "" {
var err error
envdir.EnvDir(p.EnvDir)
if err != nil {
return err
}
}

// Chronicle command w/ piped output.
cmd := exec.CommandContext(ctx, "sh", "-c", strings.Join(p.Command, " "))
cmd.Env = append(cmd.Env, env...)
out, err := cmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "Failed to open command pipe")
}
cmd.Stderr = os.Stderr
cur := fmt.Sprintf("%s-%d", p.ArtifactPath, ord)
// Write data to object store
if err := cmd.Start(); err != nil {
return errors.Wrap(err, "Failed to start chronicle pipe command")
}
if err := location.Write(ctx, out, prof, cur); err != nil {
return errors.Wrap(err, "Failed to write command output to object storage")
}
if err := cmd.Wait(); err != nil {
return errors.Wrap(err, "Chronicle pipe command failed")
}

// Write manifest pointing to new data
man := strings.NewReader(cur)
if err := location.Write(ctx, man, prof, p.ArtifactPath); err != nil {
return errors.Wrap(err, "Failed to write command output to object storage")
}
// Delete old data
prev := fmt.Sprintf("%s-%d", p.ArtifactPath, ord-1)
location.Delete(ctx, prof, prev)
return nil
}

func readProfile(path string) (p param.Profile, ok bool, err error) {
var buf []byte
buf, err = ioutil.ReadFile(path)
switch {
case os.IsNotExist(err):
ok = true
err = nil
return
case err != nil:
err = errors.Wrap(err, "Failed to read profile")
return
}
if err = json.Unmarshal(buf, &p); err != nil {
err = errors.Wrap(err, "Failed to unmarshal profile")
} else {
ok = true
}
return
}

func writeProfile(path string, p param.Profile) error {
buf, err := json.Marshal(p)
if err != nil {
return errors.Wrap(err, "Failed to write profile")
}
return ioutil.WriteFile(path, buf, os.ModePerm)
}
39 changes: 39 additions & 0 deletions pkg/chronicle/chronicle_push_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package chronicle

import (
"context"
"path/filepath"

. "gopkg.in/check.v1"
"k8s.io/apimachinery/pkg/util/rand"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/objectstore"
"github.com/kanisterio/kanister/pkg/testutil"
)

type ChroniclePushSuite struct{}

var _ = Suite(&ChroniclePushSuite{})

func (s *ChroniclePushSuite) TestPush(c *C) {
osType := objectstore.ProviderTypeS3
loc := crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
Region: testutil.TestS3Region,
Bucket: testutil.TestS3BucketName,
}
prof := *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
pp := filepath.Join(c.MkDir(), "profile.json")
err := writeProfile(pp, prof)
c.Assert(err, IsNil)

p := PushParams{
ProfilePath: pp,
ArtifactPath: rand.String(10),
Command: []string{"echo hello"},
}
ctx := context.Background()
err = push(ctx, p, 0)
c.Assert(err, IsNil)
}
61 changes: 61 additions & 0 deletions pkg/chronicle/chronicle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package chronicle

import (
"bytes"
"context"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"testing"

. "gopkg.in/check.v1"
"k8s.io/apimachinery/pkg/util/rand"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/objectstore"
"github.com/kanisterio/kanister/pkg/testutil"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

type ChronicleSuite struct{}

var _ = Suite(&ChronicleSuite{})

func (s *ChronicleSuite) TestPushPull(c *C) {
osType := objectstore.ProviderTypeS3
loc := crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
Region: testutil.TestS3Region,
Bucket: testutil.TestS3BucketName,
}
prof := *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
pp := filepath.Join(c.MkDir(), "profile.json")
err := writeProfile(pp, prof)
c.Assert(err, IsNil)

p := PushParams{
ProfilePath: pp,
ArtifactPath: rand.String(10),
}
ctx := context.Background()

for i := range make([]struct{}, 5) {
// Write i to bucket
p.Command = []string{"echo", strconv.Itoa(i)}
err = push(ctx, p, i)
c.Assert(err, IsNil)

// Pull and check that we still get i
buf := bytes.NewBuffer(nil)
err = Pull(ctx, buf, prof, p.ArtifactPath)
c.Assert(err, IsNil)
s, err := ioutil.ReadAll(buf)
c.Assert(err, IsNil)
// Remove additional '\n'
t := strings.TrimSuffix(string(s), "\n")
c.Assert(t, Equals, strconv.Itoa(i))
}
}
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func (s *ControllerSuite) SetUpSuite(c *C) {
_, err = s.crCli.Profiles(s.namespace).Create(p)
c.Assert(err, IsNil)

ss := testutil.NewTestStatefulSet()
ss := testutil.NewTestStatefulSet(1)
ss, err = s.cli.AppsV1().StatefulSets(s.namespace).Create(ss)
c.Assert(err, IsNil)
s.ss = ss

d := testutil.NewTestDeployment()
d := testutil.NewTestDeployment(1)
d, err = s.cli.AppsV1().Deployments(s.namespace).Create(d)
c.Assert(err, IsNil)
s.deployment = d
Expand Down
2 changes: 1 addition & 1 deletion pkg/envdir/envdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pkg/errors"
)

func envDir(dir string) ([]string, error) {
func EnvDir(dir string) ([]string, error) {
fis, err := ioutil.ReadDir(dir)
if err != nil {
return nil, errors.Wrap(err, "failed to read env from dir:"+dir)
Expand Down
2 changes: 1 addition & 1 deletion pkg/envdir/envdir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *EnvDirSuite) TestEnvDir(c *C) {
p := filepath.Join(d, "FOO")
err := ioutil.WriteFile(p, []byte("BAR"), os.ModePerm)
c.Assert(err, IsNil)
e, err := envDir(d)
e, err := EnvDir(d)
c.Assert(err, IsNil)
c.Assert(e, DeepEquals, []string{"FOO=BAR"})
}
Loading

0 comments on commit ed7af31

Please sign in to comment.