Skip to content

Commit

Permalink
#96 Step can pass results of command to another step
Browse files Browse the repository at this point in the history
  • Loading branch information
kilgaloon committed Apr 25, 2019
1 parent c22c348 commit 69f112a
Show file tree
Hide file tree
Showing 8 changed files with 4,131 additions and 15 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@



Current Version: **1.1.0**
Current Version: **1.2.0**

Current Release: **Calimero**

Expand Down Expand Up @@ -101,6 +101,22 @@ but in this case for example first task will block performing on any task and al
- ping google.com
- -> echo "I need to wait above step to finish, then i can do my stuff"

## Step Pipe

Output from one step can be passed to input of next step:

name: job1 // name of recipe
definition: schedule
schedule:
min: 0 // every min
hour: 0 // every hour
day: 0 // every day
steps: // steps are done from first to last
- echo "Pipe this to next step" }>
- cat > piped.txt

As you see, first step is using syntax `}>` at the end, which tells that this command output will be passed to next command input, you can chain like this how much you want.
Although this is perfectly fine syntax for step `-> echo "Pipe this to next step" }>` (we added async syntax), next step will start even before first step finished and basically nothing will be piped.

## Installation

Expand Down
10 changes: 10 additions & 0 deletions dist/recipes/job4.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: job4 # name of recipe
definition: schedule
schedule:
min: 0
hour: 0
day: 0

steps: # steps are done from first to last
- echo "I like golang" }>
- cat > i_like_it.txt
1 change: 1 addition & 0 deletions tests/configs/config_global_fb.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
error_reporting = false
# we want to log our errors, and specify where
error_log = ../tests/var/log/leprechaun/error.log
# we want to log our info about app to know whats going on
Expand Down
2 changes: 2 additions & 0 deletions tests/configs/config_regular.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
daemon.error_reporting = false

# we want to log our errors, and specify where
test.error_log = ../tests/var/log/leprechaun/error.log
# we want to log our info about app to know whats going on
Expand Down
578 changes: 578 additions & 0 deletions tests/var/log/leprechaun/error.log

Large diffs are not rendered by default.

3,436 changes: 3,436 additions & 0 deletions tests/var/log/leprechaun/info.log

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions workers/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package workers

import (
"bytes"
"io"
"os/exec"
"strings"
)

// PipeMarker is string that we mark when we want to pipe output of results
// from step command to another step command
const PipeMarker = "}>"

type cmd struct {
stdin bytes.Buffer
cmd *exec.Cmd
stdout bytes.Buffer
pipe bool
}

func (c *cmd) Run() error {
if &c.stdin != nil {
in, err := c.cmd.StdinPipe()
if err != nil {
return err
}

_, err = io.WriteString(in, string(c.stdin.Bytes()))
if err != nil {
return err
}
}

var stderr bytes.Buffer
c.cmd.Stdout = &c.stdout
c.cmd.Stderr = &stderr

err := c.cmd.Run()

return err
}

// NewCmd build new command and prepare it to be run
func newCmd(step string, i bytes.Buffer) (*cmd, error) {
cmd := &cmd{
stdin: i,
pipe: false,
}

s := strings.Fields(step)
if s[len(s)-1] == PipeMarker {
cmd.pipe = true
step = strings.Join(s[:(len(s)-1)], " ")
}

cmd.cmd = exec.Command("bash", "-c", step)

return cmd, nil
}
42 changes: 28 additions & 14 deletions workers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const AsyncMarker = "->"
type Worker struct {
StartedAt time.Time
WorkingOn string
Steps []string
Context *context.Context
Logs log.Logs
DoneChan chan string
ErrorChan chan Worker
TasksPerformed int
Cmd map[string]*exec.Cmd
steps []*cmd
Stdout *os.File
Recipe *recipe.Recipe
Err error
Expand All @@ -39,34 +39,48 @@ type Worker struct {
func (w *Worker) Run() {
w.StartedAt = time.Now()

for _, step := range w.Recipe.GetSteps() {
for i, step := range w.Recipe.GetSteps() {
w.Logs.Info("Step %s is in progress... \n", step)
// replace variables
parts := strings.Fields(step)

if len(step) < 1 {
continue
}

parts := strings.Fields(step)
if parts[0] == AsyncMarker {
step = w.Context.Transpile(strings.Join(parts[1:], " "))
go w.workOnStep(step)
go w.workOnStep(i, step)
} else {
step = w.Context.Transpile(step)
w.workOnStep(step)
w.workOnStep(i, step)
}
}
}

func (w *Worker) workOnStep(step string) {
func (w *Worker) workOnStep(i int, step string) {
w.mu.Lock()
cmd := exec.Command("bash", "-c", step)

var stderr bytes.Buffer
cmd.Stdout = w.Stdout
cmd.Stderr = &stderr
var in bytes.Buffer
if len(w.steps) > 0 {
ps := w.steps[i-1]
if ps.pipe {
in = ps.stdout
}
}

cmd, err := newCmd(step, in)
if err != nil {
w.Logs.Error(err.Error())
}

w.WorkingOn = step
w.Cmd[step] = cmd
w.Cmd[step] = cmd.cmd
w.mu.Unlock()

err := cmd.Run()
err = cmd.Run()

w.mu.Lock()
w.steps = append(w.steps, cmd)
w.mu.Unlock()

if err != nil {
w.mu.Lock()
Expand Down

0 comments on commit 69f112a

Please sign in to comment.