Skip to content

Commit

Permalink
refactor running
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed Jun 8, 2016
1 parent 207cae4 commit 7f19202
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 65 deletions.
28 changes: 14 additions & 14 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,32 @@ import (
)

func startArchiveReceivers(
lockedNodes *distributedLock,
cluster *distributedLock,
rootDir string,
sudo bool,
) (*remoteExecution, error) {
archiveReceiverCommand := []string{}
command := []string{}

prefix := []string{}
if sudo {
archiveReceiverCommand = []string{`sudo`, `-n`}
prefix = []string{`sudo`, `-n`}
}

archiveReceiverCommand = append(
archiveReceiverCommand,
[]string{
`tar`, `-x`, `--directory`, rootDir,
}...,
)
command = append(command, prefix...)
command = append(command, `mkdir`, `-p`, rootDir, `&&`)
command = append(command, prefix...)
command = append(command, `tar`, `--directory`, rootDir, `-x`)

if verbose >= verbosityDebug {
archiveReceiverCommand = append(archiveReceiverCommand, `--verbose`)
command = append(command, `--verbose`)
}

logMutex := &sync.Mutex{}

execution, err := runRemoteExecution(
lockedNodes,
archiveReceiverCommand,
runner := &remoteExecutionRunner{command: command}

execution, err := runner.run(
cluster,
func(node *remoteExecutionNode) {
node.stdout = lineflushwriter.New(
prefixwriter.New(node.stdout, "{tar} "),
Expand All @@ -59,7 +59,7 @@ func startArchiveReceivers(
return nil, hierr.Errorf(
err,
`can't start tar extraction command: '%v'`,
archiveReceiverCommand,
command,
)
}

Expand Down
28 changes: 6 additions & 22 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"io"
"os"
"strings"
"sync"

"github.com/reconquest/go-lineflushwriter"
Expand All @@ -13,15 +12,13 @@ import (

func runRemoteExecution(
lockedNodes *distributedLock,
command []string,
callback func(*remoteExecutionNode),
command string,
setupCallback func(*remoteExecutionNode),
) (*remoteExecution, error) {
var (
stdins = []io.WriteCloser{}
remoteNodes = map[*distributedLockNode]*remoteExecutionNode{}

commandString = joinCommand(command)

logMutex = &sync.Mutex{}
nodesMapMutex = &sync.Mutex{}
)
Expand All @@ -32,24 +29,24 @@ func runRemoteExecution(
tracef(
"%s",
hierr.Errorf(
commandString,
command,
"%s starting command",
node.String(),
).Error(),
)

remoteNode, err := runRemoteExecutionNode(
node,
commandString,
command,
logMutex,
)
if err != nil {
errors <- err
return
}

if callback != nil {
callback(remoteNode)
if setupCallback != nil {
setupCallback(remoteNode)
}

remoteNode.command.SetStdout(remoteNode.stdout)
Expand Down Expand Up @@ -171,16 +168,3 @@ func runRemoteExecutionNode(
stderr: stderr,
}, nil
}

func joinCommand(command []string) string {
escapedParts := []string{}

for _, part := range command {
part = strings.Replace(part, `\`, `\\`, -1)
part = strings.Replace(part, ` `, `\ `, -1)

escapedParts = append(escapedParts, part)
}

return strings.Join(escapedParts, " ")
}
83 changes: 57 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -72,8 +73,8 @@ received sync messages.
Usage:
orgalorg -h | --help
orgalorg [options] [-v]... [-o <host>...] -r= -U <files>...
orgalorg [options] [-v]... [-o <host>...] [-r=] -S <files>...
orgalorg [options] [-v]... [-o <host>...] -C [--] <command>...
orgalorg [options] [-v]... [-o <host>...] [-r=] [-g=]... -S <files>...
orgalorg [options] [-v]... [-o <host>...] [-r=] -C [--] <command>...
orgalorg [options] [-v]... [-o <host>...] -L
Operation mode options:
Expand Down Expand Up @@ -111,10 +112,12 @@ Options:
By default, orgalorg will not obtain root and do
all actions from specified user. To change that
behaviour, this option can be used.
-t --no-lock-fail Try to obtain global lock, but only print warning if
-t --no-lock-fail Try to obtain global lock, but only print warning if
it cannot be done, do not stop execution.
-r --root <root> Specify root dir to extract files into.
[default: /var/run/orgalorg/files/$RUNID]
By default, orgalorg will create temporary directory
inside of '$ROOT'.
Removal of that directory is up to sync tool.
-u --user <user> Username used for connecting to all hosts by default.
[default: $USER]
-i --stdin <file> Pass specified file as input for the command.
Expand Down Expand Up @@ -143,6 +146,12 @@ Advanced options:
support specified protocol messages. No sync
is possible in that case and all stdout and stderr
will be passed untouched back to the orgalorg.
--shell <shell> Use following shell wrapper. '{}' will be replaced
with properly escaped command. If empty, then no
shell wrapper will be used. If any args are given
using '-g', they will be appended to shell
invocation.
[default: bash -c $'{}']
--no-preserve-uid Do not preserve UIDs for transferred files.
--no-preserve-gid Do not preserve GIDs for transferred files.
Expand All @@ -165,6 +174,8 @@ const (
sshPasswordPrompt = "Password: "

heartbeatTimeoutCoefficient = 0.8

runsDirectory = "/var/run/orgalorg/"
)

var (
Expand All @@ -190,7 +201,7 @@ func main() {

usage := strings.Replace(usage, "$USER", currentUser.Username, -1)
usage = strings.Replace(usage, "$HOME", currentUser.HomeDir, -1)
usage = strings.Replace(usage, "$RUNID", generateRunID(), -1)
usage = strings.Replace(usage, "$ROOT", runsDirectory, -1)
args, err := docopt.Parse(usage, nil, true, version, true)
if err != nil {
panic(err)
Expand All @@ -215,7 +226,7 @@ func main() {
case args["--sync"].(bool):
err = synchronize(args)
case args["--command"].(bool):
err = command(args)
err = evaluate(args)
}

if err != nil {
Expand All @@ -225,16 +236,18 @@ func main() {
}
}

func command(args map[string]interface{}) error {
func evaluate(args map[string]interface{}) error {
var (
stdin, _ = args["--stdin"].(string)
sudo = args["--sudo"].(bool)
stdin, _ = args["--stdin"].(string)
rootDir, _ = args["--root"].(string)
sudo = args["--sudo"].(bool)
shell = args["--shell"].(string)

commandToRun = args["<command>"].([]string)
command = args["<command>"].([]string)
)

if sudo {
commandToRun = append([]string{"sudo", "-n"}, commandToRun...)
command = append([]string{"sudo", "-n"}, command...)
}

canceler := sync.NewCond(&sync.Mutex{})
Expand All @@ -244,21 +257,23 @@ func command(args map[string]interface{}) error {
return err
}

return run(cluster, commandToRun, stdin)
runner := &remoteExecutionRunner{
shell: shell,
command: command,
directory: rootDir,
}

return run(cluster, runner, stdin)
}

func run(
cluster *distributedLock,
commandToRun []string,
runner *remoteExecutionRunner,
stdin string,
) error {
debugf(`running command`)

execution, err := runRemoteExecution(
cluster,
commandToRun,
nil,
)
execution, err := runner.run(cluster, nil)
if err != nil {
return hierr.Errorf(
err,
Expand Down Expand Up @@ -309,12 +324,17 @@ func run(

func synchronize(args map[string]interface{}) error {
var (
stdin, _ = args["--stdin"].(string)
rootDir, _ = args["--root"].(string)
lockOnly = args["--lock"].(bool)
uploadOnly = args["--upload"].(bool)
relative = args["--relative"].(bool)
syncCmd = args["--sync-cmd"].(string)
isSimpleCmd = args["--simple"].(bool)
stdin, _ = args["--stdin"].(string)

commandString = args["--sync-cmd"].(string)
commandArgs = args["--args"].([]string)

shell = args["--shell"].(string)

fileSources = args["<files>"].([]string)
)
Expand Down Expand Up @@ -372,19 +392,26 @@ func synchronize(args map[string]interface{}) error {

tracef(`starting sync tool`)

syncCmdParsed, err := shellwords.NewParser().Parse(syncCmd)
command, err := shellwords.NewParser().Parse(commandString)
if err != nil {
return hierr.Errorf(
err,
`can't parse sync tool command: '%s'`,
syncCmd,
commandString,
)
}

runner := &remoteExecutionRunner{
shell: shell,
command: command,
args: commandArgs,
directory: rootDir,
}

if isSimpleCmd {
return run(cluster, syncCmdParsed, stdin)
return run(cluster, runner, stdin)
} else {
err := runSyncProtocol(cluster, syncCmdParsed)
err := runSyncProtocol(cluster, runner)
if err != nil {
return hierr.Errorf(
err,
Expand All @@ -402,13 +429,17 @@ func upload(
filesList []string,
) error {
var (
rootDir = args["--root"].(string)
sudo = args["--sudo"].(bool)
rootDir, _ = args["--root"].(string)
sudo = args["--sudo"].(bool)

preserveUID = !args["--no-preserve-uid"].(bool)
preserveGID = !args["--no-preserve-gid"].(bool)
)

if rootDir == "" {
rootDir = filepath.Join(runsDirectory, generateRunID())
}

debugf(`file upload started into: '%s'`, rootDir)

receivers, err := startArchiveReceivers(cluster, rootDir, sudo)
Expand Down
70 changes: 70 additions & 0 deletions remote_execution_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import "strings"

type remoteExecutionRunner struct {
shell string
command []string
args []string
directory string
}

func (runner *remoteExecutionRunner) run(
cluster *distributedLock,
setupCallback func(*remoteExecutionNode),
) (*remoteExecution, error) {
command := joinCommand(runner.command)

if runner.shell != "" {
command = wrapCommandIntoShell(command, runner.shell, runner.args)
}

if runner.directory != "" {
command = "cd " +
escapeCommandArgumentStrict(runner.directory) + " && " +
"{ " + command + "; }"
}

return runRemoteExecution(cluster, command, setupCallback)
}

func wrapCommandIntoShell(command string, shell string, args []string) string {
if shell == "" {
return command
}

escapedArgs := []string{}
for _, arg := range args {
escapedArgs = append(escapedArgs, escapeCommandArgumentStrict(arg))
}

return strings.Replace(shell, `{}`, command, -1) +
" _ " +
strings.Join(escapedArgs, " ")
}

func joinCommand(command []string) string {
escapedParts := []string{}

for _, part := range command {
escapedParts = append(escapedParts, escapeCommandArgument(part))
}

return strings.Join(escapedParts, ` `)
}

func escapeCommandArgument(argument string) string {
argument = strings.Replace(argument, `\`, `\\`, -1)
argument = strings.Replace(argument, ` `, `\ `, -1)

return argument
}

func escapeCommandArgumentStrict(argument string) string {
argument = strings.Replace(argument, `\`, `\\`, -1)
argument = strings.Replace(argument, "`", "\\`", -1)
argument = strings.Replace(argument, `"`, `\"`, -1)
argument = strings.Replace(argument, `$`, `\$`, -1)

return `"` + argument + `"`
}
Loading

0 comments on commit 7f19202

Please sign in to comment.