Skip to content

Commit

Permalink
refactor execution
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed May 27, 2016
1 parent 2d15b80 commit b50bb12
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 117 deletions.
89 changes: 10 additions & 79 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"os"
"path/filepath"
"sync"
"syscall"

"github.com/seletskiy/hierr"
Expand All @@ -16,89 +15,21 @@ func startArchiveReceivers(
lockedNodes *distributedLock,
rootDir string,
) (*remoteExecution, error) {
archiveReceiverCommandString := fmt.Sprintf(
`tar -x --verbose --directory="%s"`,
archiveReceiverCommand := []string{
`tar`, `-x`, `--verbose`, `--directory`,
rootDir,
)

unpackers := []io.WriteCloser{}

nodes := []remoteExecutionNode{}

logMutex := &sync.Mutex{}

for _, node := range lockedNodes.nodes {
tracef(
"%s",
hierr.Errorf(
archiveReceiverCommandString,
"%s starting archive receiver command",
node.String(),
).Error(),
)

archiveReceiverCommand, err := node.runner.Command(
archiveReceiverCommandString,
)
if err != nil {
return nil, hierr.Errorf(
err,
`can't create archive receiver command`,
)
}

stdin, err := archiveReceiverCommand.StdinPipe()
if err != nil {
return nil, hierr.Errorf(
err,
`can't get stdin from archive receiver command`,
)
}

unpackers = append(unpackers, stdin)

stdout := newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
node.String()+" {tar} <stdout> ",
),
true,
)
}

stderr := newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
node.String()+" {tar} <stderr> ",
),
true,
execution, err := runRemoteExecution(lockedNodes, archiveReceiverCommand)
if err != nil {
return nil, hierr.Errorf(
err,
`can't start tar extraction command: '%v'`,
archiveReceiverCommand,
)

archiveReceiverCommand.SetStdout(stdout)
archiveReceiverCommand.SetStderr(stderr)

err = archiveReceiverCommand.Start()
if err != nil {
return nil, hierr.Errorf(
err,
`can't start archive receiver command`,
)
}

nodes = append(nodes, remoteExecutionNode{
node: node,
command: archiveReceiverCommand,

stdout: stdout,
stderr: stderr,
})
}

return &remoteExecution{
stdin: multiWriteCloser{unpackers},
nodes: nodes,
}, nil
return execution, nil
}

func archiveFilesToWriter(target io.Writer, files []string) error {
Expand Down
50 changes: 26 additions & 24 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"github.com/seletskiy/hierr"
)

func runCommand(
func runRemoteExecution(
lockedNodes *distributedLock,
command []string,
verbosityLevel verbosity,
) error {
commandString := joinCommand(command)
) (*remoteExecution, error) {
var (
stdins = []io.WriteCloser{}
remoteNodes = map[*distributedLockNode]*remoteExecutionNode{}

remoteCommands := map[*distributedLockNode]*remoteExecutionNode{}
commandString = joinCommand(command)

logMutex := &sync.Mutex{}
logMutex = &sync.Mutex{}
)

for _, node := range lockedNodes.nodes {
tracef(
Expand All @@ -34,15 +36,15 @@ func runCommand(
commandString,
)
if err != nil {
return hierr.Errorf(
return nil, hierr.Errorf(
err,
`can't create remote command`,
)
}

var stdout io.WriteCloser
var stderr io.WriteCloser
switch verbosityLevel {
switch verbose {
default:
stdout = newLineFlushWriter(
logMutex,
Expand Down Expand Up @@ -89,15 +91,25 @@ func runCommand(
remoteCommand.SetStdout(stdout)
remoteCommand.SetStderr(stderr)

stdin, err := remoteCommand.StdinPipe()
if err != nil {
return nil, hierr.Errorf(
err,
`can't get stdin from archive receiver command`,
)
}

err = remoteCommand.Start()
if err != nil {
return hierr.Errorf(
return nil, hierr.Errorf(
err,
`can't start remote command`,
)
}

remoteCommands[node] = &remoteExecutionNode{
stdins = append(stdins, stdin)

remoteNodes[node] = &remoteExecutionNode{
node: node,
command: remoteCommand,

Expand All @@ -106,21 +118,11 @@ func runCommand(
}
}

for node, remoteCommand := range remoteCommands {
err := remoteCommand.command.Wait()
_ = remoteCommand.stdout.Close()
_ = remoteCommand.stderr.Close()
if err != nil {
return hierr.Errorf(
err,
`%s can't wait for remote command to finish: '%s'`,
node.String(),
commandString,
)
}
}
return &remoteExecution{
stdin: multiWriteCloser{stdins},

return nil
nodes: remoteNodes,
}, nil
}

func joinCommand(command []string) string {
Expand Down
6 changes: 3 additions & 3 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func acquireDistributedLock(
if err != nil {
errors <- hierr.Errorf(
err,
`can't create runner for address '%s'`,
`can't create runner for address: %s`,
nodeAddress,
)

return
}

debugf(`%4d/%d connection established: '%s'`,
debugf(`%4d/%d connection established: %s`,
atomic.AddInt64(&nodeIndex, 1),
len(addresses),
nodeAddress,
Expand All @@ -39,7 +39,7 @@ func acquireDistributedLock(
if err != nil {
errors <- hierr.Errorf(
err,
`can't add host to the global cluster lock: '%s'`,
`can't add host to the global cluster lock: %s`,
nodeAddress,
)

Expand Down
24 changes: 20 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func main() {

verbose = parseVerbosity(args)

if verbose >= verbosityDebug {
logger.SetLevel(lorg.LevelWarning)

switch {
case verbose >= verbosityDebug:
logger.SetLevel(lorg.LevelDebug)
}

Expand Down Expand Up @@ -229,17 +232,30 @@ func command(args map[string]interface{}) error {
)
}

logger.Debugf(`global lock acquired on %d nodes`, len(cluster.nodes))
debugf(`global lock acquired on %d nodes`, len(cluster.nodes))

debugf(`running command`)

err = runCommand(cluster, commandToRun, parseVerbosity(args))
execution, err := runRemoteExecution(
cluster,
commandToRun,
)
if err != nil {
return hierr.Errorf(
err,
`can't run command on %d nodes`,
`can't run remote execution on %d nodes`,
len(cluster.nodes),
)
}

err = execution.wait()
if err != nil {
return hierr.Errorf(
err,
`remote execution failed`,
)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion multiwrite_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (closer multiWriteCloser) Close() error {

for _, closer := range closer.writers {
err := closer.Close()
if err != nil {
if err != nil && err != io.EOF {
errs = append(errs, err.Error())
}
}
Expand Down
31 changes: 26 additions & 5 deletions remote_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ import (

type remoteExecution struct {
stdin io.WriteCloser
nodes []remoteExecutionNode
nodes map[*distributedLockNode]*remoteExecutionNode
}

type remoteExecutionResult struct {
node *remoteExecutionNode

err error
}

func (execution *remoteExecution) wait() error {
tracef("waiting %d nodes to finish", len(execution.nodes))

err := execution.stdin.Close()
if err != nil {
return hierr.Errorf(
Expand All @@ -20,12 +28,25 @@ func (execution *remoteExecution) wait() error {
)
}

results := make(chan *remoteExecutionResult, 0)
for _, node := range execution.nodes {
err := node.wait()
if err != nil {
go func(node *remoteExecutionNode) {
results <- &remoteExecutionResult{node, node.wait()}
}(node)
}

for range execution.nodes {
result := <-results
if result.err != nil {
return hierr.Errorf(
err,
`wait finished with error`,
result.err,
`%s has finished with error`,
result.node.node.String(),
)
} else {
infof(
`%s has successfully finished execution`,
result.node.node.String(),
)
}
}
Expand Down
4 changes: 3 additions & 1 deletion remote_execution_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type remoteExecutionNode struct {

func (node *remoteExecutionNode) wait() error {
err := node.command.Wait()
_ = node.stdout.Close()
_ = node.stderr.Close()
if err != nil {
if sshErr, ok := err.(*ssh.ExitError); ok {
return fmt.Errorf(
`%s failed to evaluate command, `+
`%s had failed to evaluate command, `+
`remote command exited with non-zero code: %d`,
node.node.String(),
sshErr.Waitmsg.ExitStatus(),
Expand Down

0 comments on commit b50bb12

Please sign in to comment.