diff --git a/utils_test.go b/address_test.go similarity index 75% rename from utils_test.go rename to address_test.go index 86f8430..bbd8118 100644 --- a/utils_test.go +++ b/address_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCanParseDomainValidDomainAddressWithDefaults(t *testing.T) { +func TestParseAddress_ParseValidDomainAddressWithDefaults(t *testing.T) { assert := assert.New(t) tests := []struct { @@ -38,13 +38,13 @@ func TestCanParseDomainValidDomainAddressWithDefaults(t *testing.T) { } for _, test := range tests { - user, domain, port, err := parseAddress( + address, err := parseAddress( test.Host, test.DefaultUser, test.DefaultPort, ) assert.Nil(err) - assert.Equal(test.ExpectedUser, user) - assert.Equal(test.ExpectedPort, port) - assert.Equal(test.ExpectedDomain, domain) + assert.Equal(test.ExpectedUser, address.user) + assert.Equal(test.ExpectedPort, address.port) + assert.Equal(test.ExpectedDomain, address.domain) } } diff --git a/archive.go b/archive.go index 2d3fa23..fcc9730 100644 --- a/archive.go +++ b/archive.go @@ -6,8 +6,11 @@ import ( "io" "os" "path/filepath" + "sync" "syscall" + "github.com/reconquest/go-lineflushwriter" + "github.com/reconquest/go-prefixwriter" "github.com/seletskiy/hierr" ) @@ -25,12 +28,33 @@ func startArchiveReceivers( archiveReceiverCommand = append( archiveReceiverCommand, []string{ - `tar`, `-x`, `--verbose`, `--directory`, - rootDir, + `tar`, `-x`, `--directory`, rootDir, }..., ) - execution, err := runRemoteExecution(lockedNodes, archiveReceiverCommand) + if verbose >= verbosityDebug { + archiveReceiverCommand = append(archiveReceiverCommand, `--verbose`) + } + + logMutex := &sync.Mutex{} + + execution, err := runRemoteExecution( + lockedNodes, + archiveReceiverCommand, + func(node *remoteExecutionNode) { + node.stdout = lineflushwriter.New( + prefixwriter.New(node.stdout, "{tar} "), + logMutex, + true, + ) + + node.stderr = lineflushwriter.New( + prefixwriter.New(node.stderr, "{tar} "), + logMutex, + true, + ) + }, + ) if err != nil { return nil, hierr.Errorf( err, @@ -43,7 +67,7 @@ func startArchiveReceivers( } func archiveFilesToWriter( - target io.Writer, + target io.WriteCloser, files []string, preserveUID, preserveGID bool, ) error { @@ -57,20 +81,27 @@ func archiveFilesToWriter( archive := tar.NewWriter(target) for fileIndex, fileName := range files { - logger.Infof( + infof( "%5d/%d sending file: '%s'", fileIndex+1, len(files), fileName, ) - writeFileToArchive( + err := writeFileToArchive( fileName, archive, workDir, preserveUID, preserveGID, ) + if err != nil { + return hierr.Errorf( + err, + `can't write file to archive: '%s'`, + fileName, + ) + } } tracef("closing archive stream, %d files sent", len(files)) @@ -83,6 +114,14 @@ func archiveFilesToWriter( ) } + err = target.Close() + if err != nil { + return hierr.Errorf( + err, + `can't close target stdin`, + ) + } + return nil } diff --git a/command.go b/command.go index fd8359f..2c40841 100644 --- a/command.go +++ b/command.go @@ -6,12 +6,15 @@ import ( "strings" "sync" + "github.com/reconquest/go-lineflushwriter" + "github.com/reconquest/go-prefixwriter" "github.com/seletskiy/hierr" ) func runRemoteExecution( lockedNodes *distributedLock, command []string, + callback func(*remoteExecutionNode), ) (*remoteExecution, error) { var ( stdins = []io.WriteCloser{} @@ -45,6 +48,23 @@ func runRemoteExecution( return } + if callback != nil { + callback(remoteNode) + } + + remoteNode.command.SetStdout(remoteNode.stdout) + remoteNode.command.SetStderr(remoteNode.stderr) + + err = remoteNode.command.Start() + if err != nil { + errors <- hierr.Errorf( + err, + `can't start remote command`, + ) + + return + } + nodesMapMutex.Lock() { stdins = append(stdins, remoteNode.stdin) @@ -67,7 +87,7 @@ func runRemoteExecution( } return &remoteExecution{ - stdin: multiWriteCloser{stdins}, + stdin: &multiWriteCloser{stdins}, nodes: remoteNodes, }, nil @@ -89,52 +109,51 @@ func runRemoteExecutionNode( var stdout io.WriteCloser var stderr io.WriteCloser switch verbose { - default: - stdout = newLineFlushWriter( - logMutex, - newPrefixWriter( - os.Stdout, + case verbosityQuiet: + stdout = lineflushwriter.New(nopCloser{os.Stdout}, logMutex, false) + stderr = lineflushwriter.New(nopCloser{os.Stderr}, logMutex, false) + + case verbosityNormal: + stdout = lineflushwriter.New( + prefixwriter.New( + nopCloser{os.Stdout}, node.address.domain+" ", ), + logMutex, true, ) - stderr = newLineFlushWriter( - logMutex, - newPrefixWriter( - os.Stderr, + stderr = lineflushwriter.New( + prefixwriter.New( + nopCloser{os.Stderr}, node.address.domain+" ", ), + logMutex, true, ) - case verbosityQuiet: - stdout = newLineFlushWriter(logMutex, os.Stdout, false) - stderr = newLineFlushWriter(logMutex, os.Stderr, false) - + default: + fallthrough case verbosityDebug: - stdout = newLineFlushWriter( - logMutex, - newPrefixWriter( + stdout = lineflushwriter.New( + prefixwriter.New( newDebugWriter(logger), node.String()+" {cmd} ", ), + logMutex, false, ) - stderr = newLineFlushWriter( - logMutex, - newPrefixWriter( + stderr = lineflushwriter.New( + prefixwriter.New( newDebugWriter(logger), node.String()+" {cmd} ", ), + logMutex, false, ) } - remoteCommand.SetStdout(stdout) - remoteCommand.SetStderr(stderr) - stdin, err := remoteCommand.StdinPipe() if err != nil { return nil, hierr.Errorf( @@ -143,14 +162,6 @@ func runRemoteExecutionNode( ) } - err = remoteCommand.Start() - if err != nil { - return nil, hierr.Errorf( - err, - `can't start remote command`, - ) - } - return &remoteExecutionNode{ node: node, command: remoteCommand, diff --git a/debug.go b/debug.go index 33f874b..14f3863 100644 --- a/debug.go +++ b/debug.go @@ -33,3 +33,10 @@ func infof(format string, args ...interface{}) { logger.Infof(format, args...) } + +func warningf(format string, args ...interface{}) { + loggerLock.Lock() + defer loggerLock.Unlock() + + logger.Warningf(format, args...) +} diff --git a/debug_writer.go b/debug_writer.go index 8226d5c..bbe4bb4 100644 --- a/debug_writer.go +++ b/debug_writer.go @@ -21,3 +21,7 @@ func (writer debugWriter) Write(data []byte) (int, error) { return len(data), nil } + +func (writer debugWriter) Close() error { + return nil +} diff --git a/distributed_lock.go b/distributed_lock.go index 9e1e93e..3f96fb0 100644 --- a/distributed_lock.go +++ b/distributed_lock.go @@ -2,6 +2,7 @@ package main import ( "strings" + "time" "github.com/seletskiy/hierr" "github.com/theairkit/runcmd" @@ -9,6 +10,8 @@ import ( type distributedLock struct { nodes []*distributedLockNode + + noFail bool } func (lock *distributedLock) addNodeRunner( @@ -32,9 +35,17 @@ func (lock *distributedLock) acquire(filename string) error { node.String(), ) - _, err := node.lock(filename) - + err := node.lock(filename) if err != nil { + if lock.noFail { + warningf( + hierr.Errorf( + err, + `failed to acquire lock, but continuing execution`, + ).Error(), + ) + } + nodes := []string{} for _, node := range lock.nodes { nodes = append(nodes, node.String()) @@ -51,3 +62,9 @@ func (lock *distributedLock) acquire(filename string) error { return nil } + +func (lock *distributedLock) runHeartbeats(period time.Duration) { + for _, node := range lock.nodes { + go heartbeat(period, node) + } +} diff --git a/distributed_lock_node.go b/distributed_lock_node.go index 1bb4992..c76a862 100644 --- a/distributed_lock_node.go +++ b/distributed_lock_node.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + "github.com/reconquest/go-lineflushwriter" + "github.com/reconquest/go-prefixwriter" "github.com/seletskiy/hierr" "github.com/theairkit/runcmd" ) @@ -19,23 +21,26 @@ const ( type distributedLockNode struct { address address runner runcmd.Runner + + connection *distributedLockConnection } func (node *distributedLockNode) String() string { return node.address.String() } -type distributedLockReleaser struct { - lockReadStdin io.WriteCloser +type distributedLockConnection struct { + stdin io.WriteCloser + stdout io.Reader } func (node *distributedLockNode) lock( filename string, -) (*distributedLockReleaser, error) { +) error { lockCommandString := fmt.Sprintf( `sh -c $'`+ `flock -nx %s -c \'`+ - `printf "%s\\n" && read unlock\' || `+ + `printf "%s\\n" && cat\' || `+ `printf "%s\\n"'`, filename, lockAcquiredString, @@ -52,23 +57,23 @@ func (node *distributedLockNode) lock( lockCommand, err := node.runner.Command(lockCommandString) if err != nil { - return nil, err + return err } stdout, err := lockCommand.StdoutPipe() if err != nil { - return nil, hierr.Errorf( + return hierr.Errorf( err, `can't get control stdout pipe from lock process`, ) } - stderr := newLineFlushWriter( - logMutex, - newPrefixWriter( + stderr := lineflushwriter.New( + prefixwriter.New( newDebugWriter(logger), fmt.Sprintf("%s {flock} ", node.String()), ), + logMutex, true, ) @@ -76,7 +81,7 @@ func (node *distributedLockNode) lock( stdin, err := lockCommand.StdinPipe() if err != nil { - return nil, hierr.Errorf( + return hierr.Errorf( err, `can't get control stdin pipe to lock process`, ) @@ -84,7 +89,7 @@ func (node *distributedLockNode) lock( err = lockCommand.Start() if err != nil { - return nil, hierr.Errorf( + return hierr.Errorf( err, `%s can't start lock command: '%s`, node, lockCommandString, @@ -93,7 +98,7 @@ func (node *distributedLockNode) lock( line, err := bufio.NewReader(stdout).ReadString('\n') if err != nil { - return nil, hierr.Errorf( + return hierr.Errorf( err, `can't read line from lock process`, ) @@ -104,7 +109,7 @@ func (node *distributedLockNode) lock( // pass case lockLockedString: - return nil, fmt.Errorf( + return fmt.Errorf( `%s can't acquire lock, `+ `lock already obtained by another process `+ `or unavailable`, @@ -112,7 +117,7 @@ func (node *distributedLockNode) lock( ) default: - return nil, fmt.Errorf( + return fmt.Errorf( `%s unexpected reply string encountered `+ `instead of '%s' or '%s': '%s'`, node, lockAcquiredString, lockLockedString, @@ -122,7 +127,10 @@ func (node *distributedLockNode) lock( tracef(`lock acquired: '%s' on '%s'`, node, filename) - return &distributedLockReleaser{ - lockReadStdin: stdin, - }, nil + node.connection = &distributedLockConnection{ + stdin: stdin, + stdout: stdout, + } + + return nil } diff --git a/heartbeat.go b/heartbeat.go new file mode 100644 index 0000000..5afbfe8 --- /dev/null +++ b/heartbeat.go @@ -0,0 +1,41 @@ +package main + +import ( + "bufio" + "io" + "strings" + "time" + + "github.com/seletskiy/hierr" +) + +const ( + heartbeatPing = "PING" +) + +func heartbeat(period time.Duration, node *distributedLockNode) { + ticker := time.Tick(period) + + for { + _, err := io.WriteString(node.connection.stdin, heartbeatPing+"\n") + if err != nil { + logger.Fatal(hierr.Errorf(err, `can't send heartbeat`)) + } + + <-ticker + + ping, err := bufio.NewReader(node.connection.stdout).ReadString('\n') + if err != nil { + logger.Fatal(hierr.Errorf(err, `can't receive heartbeat`)) + } + + if strings.TrimSpace(ping) != heartbeatPing { + logger.Fatalf( + `received unexpected heartbeat ping: '%s'`, + ping, + ) + } + + tracef(`%s heartbeat`, node.String()) + } +} diff --git a/line_flush_writer.go b/line_flush_writer.go deleted file mode 100644 index 293f7b9..0000000 --- a/line_flush_writer.go +++ /dev/null @@ -1,88 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "io" - "strings" - "sync" -) - -type lineFlushWriter struct { - mutex *sync.Mutex - writer io.Writer - buffer *bytes.Buffer - - newlineAtEnd bool -} - -func newLineFlushWriter( - mutex *sync.Mutex, - writer io.Writer, - newlineAtEnd bool, -) lineFlushWriter { - return lineFlushWriter{ - writer: writer, - mutex: mutex, - buffer: &bytes.Buffer{}, - - newlineAtEnd: newlineAtEnd, - } -} - -func (writer lineFlushWriter) Write(data []byte) (int, error) { - written, err := writer.buffer.Write(data) - if err != nil { - return written, err - } - - var ( - reader = bufio.NewReader(writer.buffer) - eof = false - ) - - for !eof { - line, err := reader.ReadString('\n') - - writer.mutex.Lock() - - if err != nil { - if err != io.EOF { - writer.mutex.Unlock() - return 0, err - } else { - eof = true - } - } - - var written int - if eof { - writer.buffer.Reset() - written, err = writer.buffer.WriteString(line) - } else { - written, err = writer.writer.Write([]byte(line)) - } - - writer.mutex.Unlock() - if err != nil { - return written, err - } - } - - return written, nil -} - -func (writer lineFlushWriter) Close() error { - if writer.newlineAtEnd && writer.buffer.Len() > 0 { - if !strings.HasSuffix(writer.buffer.String(), "\n") { - _, err := writer.buffer.WriteString("\n") - if err != nil { - return err - } - } - } - - _, err := writer.writer.Write(writer.buffer.Bytes()) - return err - return nil -} diff --git a/lock.go b/lock.go index 50f6b03..e2b547b 100644 --- a/lock.go +++ b/lock.go @@ -1,17 +1,26 @@ package main -import "github.com/seletskiy/hierr" -import "sync/atomic" +import ( + "sync" + "sync/atomic" + + "github.com/seletskiy/hierr" +) func acquireDistributedLock( lockFile string, runnerFactory runnerFactory, addresses []address, ) (*distributedLock, error) { - lock := &distributedLock{} + var ( + lock = &distributedLock{} + + nodeIndex = int64(0) + errors = make(chan error, 0) + + mutex = &sync.Mutex{} + ) - nodeIndex := int64(0) - errors := make(chan error, 0) for _, nodeAddress := range addresses { go func(nodeAddress address) { tracef(`connecting to address: '%s'`, @@ -35,7 +44,11 @@ func acquireDistributedLock( nodeAddress, ) - err = lock.addNodeRunner(runner, nodeAddress) + mutex.Lock() + { + err = lock.addNodeRunner(runner, nodeAddress) + } + mutex.Unlock() if err != nil { errors <- hierr.Errorf( err, diff --git a/main.go b/main.go index 7ace109..4f17d2e 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/user" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/docopt/docopt-go" "github.com/kovetskiy/lorg" + "github.com/mattn/go-shellwords" "github.com/seletskiy/hierr" ) @@ -30,44 +32,66 @@ and ownerships, then upload archive in parallel to the specified hosts and unpacks it in the temporary directory (see '-r'). No further actions will be done until all hosts unpacks the archive. -If no '-d' flag specified, after upload post-action tool will be launched ( -see '--post-action'). Post-action tool can send stdout and stderr back to -the orgalorg, but it need to be prefixed with special prefix, passed in the -first argument. Additionally, post-action tool can separate it's actions into -several phases, which should be synced across all cluster. To do it, tool -should send 'SYNC' message to the stdout as soon as phase is done. Then, tool -can receive 'SYNC ' back from orgalorg as many times as there are -messages sent from other hosts. Tool can decide how soon it should continue -based on the ammount received 'SYNC' messages. +If '-S' flag specified, then sync command tool will be launched after upload +(see '--sync-cmd'). Sync command tool can send stdout and stderr back to the +orgalorg, but it need to be compatible with following procotol. -Restrictions: +First of all, sync command tool and orgalorg communicate through stdout/stdin. +All lines, that are not match protocol will be printed untouched. - * only one authentication method can be used, and corresponding - authentication data used for all specified hosts; +orgalorg first send hello message to the each running node, where '' +is an unique string + + HELLO + +All consequent communication must be prefixed by that prefix, followed by +space. + +Then, orgalorg will pass nodes list to each running node by sending 'NODE' +commands, where '' is unique node identifier: + + NODE + +After nodes list is exchausted, orgalorg will send 'START' marker, that means +sync tool may proceed with execution. + + START + +Then, sync command tool can reply with 'SYNC' messages, that will be +broadcasted to all connected nodes by orgalorg: + + SYNC + +Broadcasted sync message will contain source node: + + SYNC + +Each node can decide, when to wait synchronizations, based on amount of +received sync messages. Usage: orgalorg -h | --help - orgalorg [options] [-v]... (-o ...|-s) -S ... [-d|--stop-at-upload] - orgalorg [options] [-v]... (-o ...|-s) (-C|--command) [--] ... - orgalorg [options] [-v]... (-o ...|-s) (-L|--stop-at-lock) + orgalorg [options] [-v]... [-o ...] -r= -U ... + orgalorg [options] [-v]... [-o ...] [-r=] -S ... + orgalorg [options] [-v]... [-o ...] -C [--] ... + orgalorg [options] [-v]... [-o ...] -L -Operation modes: +Operation mode options: -S --sync Sync. - Synchronizes files on the specified hosts via 4-stage + Synchronizes files on the specified hosts via 3-stage process: * global cluster locking (use -L to stop here); * tar-ing files on local machine, transmitting and - unpacking files to the intermediate directory - (-d to stop here); - * launching post-action tool such as gunter; - -L --stop-at-lock Will stop right after locking, e.g. will not try to + unpacking files to the intermediate directory + (-U to stop here); + * launching sync command tool such as gunter; + -L --lock Will stop right after locking, e.g. will not try to do sync whatsoever. Will keep lock until interrupted. - -d --stop-at-upload Will lock and upload files into specified intermediate - directory, then stop. - -C --command Run specified command on all hosts. + -U --upload Upload files to specified directory and exit. + -C --command Run specified command on all hosts and exit. Required options: - -o Target host in format [@][:]. + -o --host Target host in format [@][:]. If value is started from '/' or from './', then it's considered file which should be used to read hosts from. @@ -77,11 +101,6 @@ Required options: Options: -h --help Show this help. - -n --dry-run Dry run: upload files on hosts and run gunter in dry - run mode. No real files will be replaced. Temporary - files will be deleted. Guntalina will be launched in - dry mode too. - -b --no-backup Do not backup of modified files on each target host. -k --key Identity file (private key), which will be used for authentication. If '-k' option is not used, then password authentication will be used instead. @@ -113,45 +132,39 @@ Advanced options: then file will be uploaded into '/tmp/x' on the remote hosts. That option switches off that behavior. - --no-preserve-uid Do not preserve UIDs for transferred files. - --no-preserve-gid Do not preserve GIDs for transferred files. - --sync-tool Run specified post-action tool on each remote node. - Post-action tool should accept followin arguments: - * string prefix, that should be used to prefix all - stdout and stderr from the process; all unprefixed - data will be treated as control commands. - * path to directory which contain received files. - * additional arguments from the '-g' flag. - * -- - * hosts to sync, one per argument. - [default: /usr/lib/orgalorg/post-action] - -g --args Arguments to pass untouched to the post-action tool. + -n --sync-cmd Run specified sync command tool on each remote node. + Orgalorg will communicate with sync command tool via + stdin. See 'Protocol commands' below. + [default: /usr/lib/orgalorg/sync] + -g --args Arguments to pass untouched to the sync command tool. No modification will be done to the passed arg, so take care about escaping. - -m --simple-tool Treat post-action as simple tool, which is not + -m --simple Treat sync command as simple tool, which is not support specified protocol messages. No sync is possible in that case and all stdout and stderr will be passed untouched back to the orgalorg. - Excludes '--sync-tool'. + --no-preserve-uid Do not preserve UIDs for transferred files. + --no-preserve-gid Do not preserve GIDs for transferred files. Timeout options: --conn-timeout Remote host connection timeout in milliseconds. [default: 10000] --send-timeout Remote host connection data sending timeout in milliseconds. [default: 10000] + NOTE: send timeout will be also used for the + heartbeat messages, that orgalorg and connected nodes + exchanges through synchronization process. --recv-timeout Remote host connection data receiving timeout in milliseconds. [default: 10000] --keep-alive How long to keep connection keeped alive after session - end in milliseconds. [default: 60000] -Control commands: - - SYNC Cause orgalorg to broadcast 'SYNC ' to all - connected nodes. + ends. [default: 10000] ` const ( defaultSSHPort = 22 sshPasswordPrompt = "Password: " + + heartbeatTimeoutCoefficient = 0.8 ) var ( @@ -172,7 +185,7 @@ func main() { usage := strings.Replace(usage, "$USER", currentUser.Username, -1) usage = strings.Replace(usage, "$HOME", currentUser.HomeDir, -1) usage = strings.Replace(usage, "$RUNID", generateRunID(), -1) - args, err := docopt.Parse(usage, nil, true, version, false) + args, err := docopt.Parse(usage, nil, true, version, true) if err != nil { panic(err) } @@ -184,21 +197,18 @@ func main() { switch { case verbose >= verbosityDebug: logger.SetLevel(lorg.LevelDebug) + case verbose >= verbosityNormal: + logger.SetLevel(lorg.LevelInfo) } switch { - case args["-L"].(bool): - // because of docopt - args["--stop-at-lock"] = true + case args["--upload"].(bool): fallthrough - case args["--stop-at-lock"].(bool): + case args["--lock"].(bool): fallthrough - case args["-S"].(bool): + case args["--sync"].(bool): err = synchronize(args) - case args["--command"].(bool): - fallthrough - case args["-C"].(bool): err = command(args) } @@ -209,49 +219,35 @@ func main() { func command(args map[string]interface{}) error { var ( - lockFile = args["--lock-file"].(string) + stdin, _ = args["--stdin"].(string) + sudo = args["--sudo"].(bool) + commandToRun = args[""].([]string) - stdin, _ = args["--stdin"].(string) - sudo = args["--sudo"].(bool) ) if sudo { commandToRun = append([]string{"sudo", "-n"}, commandToRun...) } - runners, err := createRunnerFactory(args) - if err != nil { - return hierr.Errorf( - err, - `can't create runner factory`, - ) - } - - addresses, err := parseAddresses(args) - if err != nil { - return hierr.Errorf( - err, - `can't parse all specified addresses`, - ) - } - - debugf(`connecting to %d nodes`, len(addresses)) - - cluster, err := acquireDistributedLock(lockFile, runners, addresses) + cluster, err := connectAndLock(args) if err != nil { - return hierr.Errorf( - err, - `acquiring global cluster lock failed`, - ) + return err } - debugf(`global lock acquired on %d nodes`, len(cluster.nodes)) + return run(cluster, commandToRun, stdin) +} +func run( + cluster *distributedLock, + commandToRun []string, + stdin string, +) error { debugf(`running command`) execution, err := runRemoteExecution( cluster, commandToRun, + nil, ) if err != nil { return hierr.Errorf( @@ -282,6 +278,14 @@ func command(args map[string]interface{}) error { debugf(`waiting execution to finish`) + err = execution.stdin.Close() + if err != nil { + return hierr.Errorf( + err, + `can't close stdin`, + ) + } + err = execution.wait() if err != nil { return hierr.Errorf( @@ -295,57 +299,43 @@ func command(args map[string]interface{}) error { func synchronize(args map[string]interface{}) error { var ( - lockOnly = args["--stop-at-lock"].(bool) - uploadOnly = args["--stop-at-upload"].(bool) || args["-d"].(bool) - fileSources = args[""].([]string) + lockOnly = args["--lock"].(bool) + uploadOnly = args["--upload"].(bool) relative = args["--relative"].(bool) - lockFile = args["--lock-file"].(string) + syncCmd = args["--sync-cmd"].(string) + isSimpleCmd = args["--simple"].(bool) + stdin, _ = args["--stdin"].(string) + + fileSources = args[""].([]string) ) - addresses, err := parseAddresses(args) - if err != nil { - return hierr.Errorf( - err, - `can't parse all specified addresses`, - ) - } + var ( + filesList = []string{} + + err error + ) - filesList := []string{} if !lockOnly { - logger.Infof(`building files list from %d sources`, len(fileSources)) + debugf(`building files list from %d sources`, len(fileSources)) filesList, err = getFilesList(relative, fileSources...) if err != nil { return hierr.Errorf( err, - `can't obtain files list to sync from localhost`, + `can't build files list`, ) } - logger.Infof(`file list contains %d files`, len(filesList)) + debugf(`file list contains %d files`, len(filesList)) + tracef(`files to upload: %+v`, filesList) } - runners, err := createRunnerFactory(args) + cluster, err := connectAndLock(args) if err != nil { - return hierr.Errorf( - err, - `can't create runner factory`, - ) - } - - infof(`connecting to %d nodes`, len(addresses)) - - cluster, err := acquireDistributedLock(lockFile, runners, addresses) - if err != nil { - return hierr.Errorf( - err, - `acquiring global cluster lock failed`, - ) + return err } - logger.Infof(`global lock acquired on %d nodes`, len(cluster.nodes)) - if lockOnly { - logger.Warning("-L|--stop-at-lock was passed, waiting for interrupt...") + warningf("-L|--lock was passed, waiting for interrupt...") wait := sync.WaitGroup{} wait.Add(1) @@ -362,10 +352,33 @@ func synchronize(args map[string]interface{}) error { ) } - logger.Info(`upload done`) + tracef(`upload done`) if uploadOnly { - logger.Warning("-d|--stop-at-upload was passed, finishing...") + return nil + } + + tracef(`starting sync tool`) + + syncCmdParsed, err := shellwords.NewParser().Parse(syncCmd) + if err != nil { + return hierr.Errorf( + err, + `can't parse sync tool command: '%s'`, + syncCmd, + ) + } + + if isSimpleCmd { + return run(cluster, syncCmdParsed, stdin) + } else { + err := runSyncProtocol(cluster, syncCmdParsed) + if err != nil { + return hierr.Errorf( + err, + `failed to run sync command`, + ) + } } return nil @@ -377,13 +390,14 @@ func upload( filesList []string, ) error { var ( - rootDir = args["--root"].(string) + rootDir = args["--root"].(string) + sudo = args["--sudo"].(bool) + preserveUID = !args["--no-preserve-uid"].(bool) preserveGID = !args["--no-preserve-gid"].(bool) - sudo = args["--sudo"].(bool) ) - logger.Infof(`file upload started into: '%s'`, rootDir) + debugf(`file upload started into: '%s'`, rootDir) receivers, err := startArchiveReceivers(cluster, rootDir, sudo) if err != nil { @@ -406,18 +420,79 @@ func upload( ) } - logger.Info(`waiting file upload to finish`) + tracef(`waiting file upload to finish`) + + err = receivers.stdin.Close() + if err != nil { + return hierr.Errorf( + err, + `can't close archive receiver stdin`, + ) + } + err = receivers.wait() if err != nil { return hierr.Errorf( err, - `can't finish files archive`, + `archive upload failed`, ) } return nil } +func connectAndLock(args map[string]interface{}) (*distributedLock, error) { + var ( + lockFile = args["--lock-file"].(string) + sendTimeout = args["--send-timeout"].(string) + noLockFail = args["--no-lock-abort"].(bool) + ) + + addresses, err := parseAddresses(args) + if err != nil { + return nil, hierr.Errorf( + err, + `can't parse all specified addresses`, + ) + } + + runners, err := createRunnerFactory(args) + if err != nil { + return nil, hierr.Errorf( + err, + `can't create runner factory`, + ) + } + + debugf(`connecting to %d nodes`, len(addresses)) + + cluster, err := acquireDistributedLock( + lockFile, + runners, + addresses, + noLockFail, + ) + if err != nil { + return nil, hierr.Errorf( + err, + `acquiring global cluster lock failed`, + ) + } + + debugf(`global lock acquired on %d nodes`, len(cluster.nodes)) + + // err already checked in the timeouts.go + heartbeatMilliseconds, _ := strconv.Atoi(sendTimeout) + + cluster.runHeartbeats( + time.Duration( + float64(heartbeatMilliseconds)*heartbeatTimeoutCoefficient, + ) * time.Millisecond, + ) + + return cluster, nil +} + func createRunnerFactory(args map[string]interface{}) (runnerFactory, error) { var ( sshKeyPath, _ = args["--key"].(string) @@ -464,7 +539,7 @@ func createRunnerFactory(args map[string]interface{}) (runnerFactory, error) { func parseAddresses(args map[string]interface{}) ([]address, error) { var ( defaultUser = args["--user"].(string) - hosts = args["-o"].([]string) + hosts = args["--host"].([]string) fromStdin = args["--read-stdin"].(bool) ) diff --git a/multiwrite_closer.go b/multiwrite_closer.go index 7b47502..9de7062 100644 --- a/multiwrite_closer.go +++ b/multiwrite_closer.go @@ -10,16 +10,28 @@ type multiWriteCloser struct { writers []io.WriteCloser } -func (closer multiWriteCloser) Write(data []byte) (int, error) { - writers := []io.Writer{} +func (closer *multiWriteCloser) Write(data []byte) (int, error) { + errs := []string{} + for _, writer := range closer.writers { - writers = append(writers, writer) + _, err := writer.Write(data) + if err != nil && err != io.EOF { + errs = append(errs, err.Error()) + } + } + + if len(errs) > 0 { + return 0, fmt.Errorf( + "%d errors: %s", + len(errs), + strings.Join(errs, "; "), + ) } - return io.MultiWriter(writers...).Write(data) + return len(data), nil } -func (closer multiWriteCloser) Close() error { +func (closer *multiWriteCloser) Close() error { errs := []string{} for _, closer := range closer.writers { @@ -33,7 +45,7 @@ func (closer multiWriteCloser) Close() error { return fmt.Errorf( "%d errors: %s", len(errs), - strings.Join(errs, ";"), + strings.Join(errs, "; "), ) } diff --git a/nop_closer.go b/nop_closer.go new file mode 100644 index 0000000..e934ec6 --- /dev/null +++ b/nop_closer.go @@ -0,0 +1,13 @@ +package main + +import ( + "io" +) + +type nopCloser struct { + io.Writer +} + +func (closer nopCloser) Close() error { + return nil +} diff --git a/prefix_writer.go b/prefix_writer.go deleted file mode 100644 index eff78c1..0000000 --- a/prefix_writer.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "bytes" - "io" -) - -type prefixWriter struct { - writer io.Writer - prefix string - - streamStarted bool - lineIncomplete bool -} - -func newPrefixWriter(writer io.Writer, prefix string) *prefixWriter { - return &prefixWriter{ - writer: writer, - prefix: prefix, - } -} - -func (writer *prefixWriter) Write(data []byte) (int, error) { - reader := bytes.NewBuffer(data) - eof := false - for !eof { - line, err := reader.ReadString('\n') - if err != nil { - if err != io.EOF { - return 0, err - } else { - eof = true - } - } - - if line == "" { - continue - } - - if !writer.streamStarted { - line = writer.prefix + line - - writer.streamStarted = true - } else { - if !writer.lineIncomplete { - line = writer.prefix + line - } - - if eof { - writer.lineIncomplete = true - } - } - - _, err = writer.writer.Write([]byte(line)) - if err != nil { - return 0, err - } - } - - return len(data), nil -} diff --git a/protocol_node_writer.go b/protocol_node_writer.go new file mode 100644 index 0000000..1f099a0 --- /dev/null +++ b/protocol_node_writer.go @@ -0,0 +1,77 @@ +package main + +import ( + "bufio" + "bytes" + "io" +) + +type protocolNodeWriter struct { + node *remoteExecutionNode + protocol *syncProtocol + + stdout io.Writer + + buffer *bytes.Buffer +} + +func newProtocolNodeWriter( + node *remoteExecutionNode, + protocol *syncProtocol, +) *protocolNodeWriter { + return &protocolNodeWriter{ + node: node, + stdout: node.stdout, + protocol: protocol, + buffer: &bytes.Buffer{}, + } +} + +func (writer *protocolNodeWriter) Write(data []byte) (int, error) { + written, err := writer.buffer.Write(data) + if err != nil { + return written, err + } + + reader := bufio.NewReader(writer.buffer) + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + _, err := io.WriteString(writer.buffer, line) + if err != nil { + return 0, err + } + + break + } + } + + switch { + case writer.protocol.IsSyncCommand(line): + tracef( + "%s sent sync command: '%s'", + writer.node.String(), + line, + ) + + err := writer.protocol.SendSync(writer.node, line) + + if err != nil { + return 0, err + } + default: + _, err := io.WriteString(writer.stdout, line) + if err != nil { + return 0, err + } + } + } + + return written, nil +} + +func (writer *protocolNodeWriter) Close() error { + return nil +} diff --git a/remote_execution.go b/remote_execution.go index a269b41..b800c20 100644 --- a/remote_execution.go +++ b/remote_execution.go @@ -20,14 +20,6 @@ type remoteExecutionResult struct { func (execution *remoteExecution) wait() error { tracef("waiting %d nodes to finish", len(execution.nodes)) - err := execution.stdin.Close() - if err != nil { - return hierr.Errorf( - err, - `can't close stdin stream`, - ) - } - results := make(chan *remoteExecutionResult, 0) for _, node := range execution.nodes { go func(node *remoteExecutionNode) { @@ -44,7 +36,7 @@ func (execution *remoteExecution) wait() error { result.node.node.String(), ) } else { - infof( + tracef( `%s has successfully finished execution`, result.node.node.String(), ) diff --git a/remote_execution_node.go b/remote_execution_node.go index 6db0461..306388c 100644 --- a/remote_execution_node.go +++ b/remote_execution_node.go @@ -21,9 +21,9 @@ type remoteExecutionNode struct { func (node *remoteExecutionNode) wait() error { err := node.command.Wait() - _ = node.stdout.Close() - _ = node.stderr.Close() if err != nil { + _ = node.stdout.Close() + _ = node.stderr.Close() if sshErr, ok := err.(*ssh.ExitError); ok { return fmt.Errorf( `%s had failed to evaluate command, `+ @@ -35,10 +35,32 @@ func (node *remoteExecutionNode) wait() error { return hierr.Errorf( err, - `%s failed to receive archive, unexpected error`, + `%s failed to finish execution, unexpected error`, + node.node.String(), + ) + } + + err = node.stdout.Close() + if err != nil { + return hierr.Errorf( + err, + `%s can't close stdout`, + node.node.String(), + ) + } + + err = node.stderr.Close() + if err != nil { + return hierr.Errorf( + err, + `%s can't close stderr`, node.node.String(), ) } return nil } + +func (node *remoteExecutionNode) String() string { + return node.node.String() +} diff --git a/run_tests b/run_tests index 57b8796..66ee345 100755 --- a/run_tests +++ b/run_tests @@ -15,6 +15,18 @@ include tests/ssh.sh include tests/build.sh include tests/orgalorg.sh +which brctl >/dev/null 2>&1 +if [ $? -ne 0 ]; then + echo "missing dependency: brctl (bridge-utils)" + exit 1 +fi + +which hastur >/dev/null 2>&1 +if [ $? -ne 0 ]; then + echo "missing dependency: hastur" + exit 1 +fi + test-runner:set-custom-opts \ --keep-containers \ --keep-images \ @@ -23,7 +35,8 @@ test-runner:set-custom-opts \ test-runner:handle-custom-opt() { case "$1" in --keep-containers) - hastur:keep-containers + containers:keep-containers + hastur:keep-images ;; --keep-images) @@ -36,17 +49,11 @@ test-runner:handle-custom-opt() { esac } -which brctl >/dev/null 2>&1 -if [ $? -ne 0 ]; then - echo "missing dependency: brctl (bridge-utils)" - exit 1 -fi +progress:spinner:new _progress_spinner -which hastur >/dev/null 2>&1 -if [ $? -ne 0 ]; then - echo "missing dependency: hastur" - exit 1 -fi +test-runner:progress() { + progress:spinner:spin "$_progress_spinner" > /dev/null +} :init() { :build:init @@ -57,11 +64,11 @@ fi :cleanup() { containers:wipe + hastur:destroy-root + progress:spinner:stop "$_progress_spinner" } -progress:spinner:new _progress_spinner - :init 2> >(progress:spinner:spin "$_progress_spinner" > /dev/null) trap :cleanup EXIT diff --git a/sync.go b/sync.go new file mode 100644 index 0000000..260d971 --- /dev/null +++ b/sync.go @@ -0,0 +1,58 @@ +package main + +import "github.com/seletskiy/hierr" + +func runSyncProtocol(cluster *distributedLock, command []string) error { + protocol := newSyncProtocol() + + execution, err := runRemoteExecution( + cluster, + command, + func(remoteNode *remoteExecutionNode) { + remoteNode.stdout = newProtocolNodeWriter(remoteNode, protocol) + }, + ) + if err != nil { + return hierr.Errorf( + err, + `can't run sync tool command`, + ) + } + + err = protocol.Init(execution.stdin) + if err != nil { + return hierr.Errorf( + err, + `can't init protocol with sync tool`, + ) + } + + for _, node := range execution.nodes { + err := protocol.SendNode(node) + if err != nil { + return hierr.Errorf( + err, + `can't send node to sync tool: '%s'`, + node.String(), + ) + } + } + + err = protocol.SendStart() + if err != nil { + return hierr.Errorf( + err, + `can't start sync tool`, + ) + } + + err = execution.wait() + if err != nil { + return hierr.Errorf( + err, + `failed to finish sync tool command`, + ) + } + + return nil +} diff --git a/sync_protocol.go b/sync_protocol.go new file mode 100644 index 0000000..c7fb37c --- /dev/null +++ b/sync_protocol.go @@ -0,0 +1,113 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "strings" + "time" +) + +var ( + syncProtocolPrefix = "ORGALORG" + syncProtocolHello = "HELLO" + syncProtocolNode = "NODE" + syncProtocolStart = "START" + syncProtocolSync = "SYNC" +) + +type syncProtocol struct { + node *remoteExecutionNode + + input *bytes.Buffer + output io.WriteCloser + + prefix string +} + +func newSyncProtocol() *syncProtocol { + return &syncProtocol{ + input: &bytes.Buffer{}, + prefix: fmt.Sprintf( + "%s:%d", + syncProtocolPrefix, + time.Now().UnixNano(), + ), + } +} + +func (protocol *syncProtocol) Close() error { + return nil +} + +func (protocol *syncProtocol) Init(output io.WriteCloser) error { + protocol.output = output + + _, err := io.WriteString( + protocol.output, + protocol.prefix+" "+syncProtocolHello+"\n", + ) + if err != nil { + return protocolSuspendEOF(err) + } + + return nil +} + +func (protocol *syncProtocol) SendNode(node *remoteExecutionNode) error { + _, err := io.WriteString( + protocol.output, + protocol.prefix+" "+syncProtocolNode+" "+node.String()+"\n", + ) + if err != nil { + return protocolSuspendEOF(err) + } + + return nil +} + +func (protocol *syncProtocol) SendStart() error { + _, err := io.WriteString( + protocol.output, + protocol.prefix+" "+syncProtocolStart+"\n", + ) + if err != nil { + return protocolSuspendEOF(err) + } + + return nil +} + +func (protocol *syncProtocol) IsSyncCommand(line string) bool { + return strings.HasPrefix(line, protocol.prefix+" "+syncProtocolSync) +} + +func (protocol *syncProtocol) SendSync( + source *remoteExecutionNode, + sync string, +) error { + data := strings.TrimSpace( + strings.TrimPrefix(sync, protocol.prefix+" "+syncProtocolSync), + ) + + _, err := io.WriteString( + protocol.output, + protocol.prefix+" "+syncProtocolSync+" "+source.String()+" "+data+"\n", + ) + + if err != nil { + return protocolSuspendEOF(err) + } + + return nil +} + +// Suspend EOF for be compatible with simple commands, that are not support +// protocol, and therefore can close exit earlier, than protocol is initiated. +func protocolSuspendEOF(err error) error { + if err == io.EOF { + return nil + } + + return err +} diff --git a/tests/orgalorg.sh b/tests/orgalorg.sh index 96e5b54..144c56b 100644 --- a/tests/orgalorg.sh +++ b/tests/orgalorg.sh @@ -2,6 +2,8 @@ orgalorg_user="orgalorg" -:orgalorg-key() { +:orgalorg:with-key() { + tests:debug "!!! connecting to hosts: ${ips[@]}" + orgalorg -u $orgalorg_user ${ips[*]/#/-o} -k "$(:ssh:get-key)" "${@}" } diff --git a/tests/setup.sh b/tests/setup.sh index 1bcaf94..56c7b81 100644 --- a/tests/setup.sh +++ b/tests/setup.sh @@ -43,7 +43,18 @@ tests:debug "[$container_name] sshd is offline" done - tests:debug "[$container_name] sshs is online" + tests:debug "[$container_name] sshd is online" +} + +:install-sync-command-into-container() { + local file_name="$1" + local container_name="$2" + + containers:get-rootfs rootfs "$container_name" + + tests:ensure sudo mkdir -p "$rootfs/usr/lib/orgalorg/" + tests:ensure sudo cp "$file_name" "$rootfs/usr/lib/orgalorg/" + tests:ensure sudo chmod +x "$rootfs/usr/lib/orgalorg/sync" } tests:debug "!!! setup" diff --git a/tests/ssh.sh b/tests/ssh.sh index f09708d..e2a5418 100644 --- a/tests/ssh.sh +++ b/tests/ssh.sh @@ -48,6 +48,7 @@ ssh \ -oStrictHostKeyChecking=no \ -oPasswordAuthentication=no \ + -oControlPath=none \ -i "$identity" \ -l "$user" \ "$ip" "${@}" diff --git a/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh b/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh index b371659..9b00d00 100644 --- a/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh +++ b/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -e -C echo 'two spaces' +tests:ensure :orgalorg:with-key -e -C echo 'two spaces' tests:assert-stdout "two spaces" diff --git a/tests/testcases/commands/can-run-command-under-sudo.test.sh b/tests/testcases/commands/can-run-command-under-sudo.test.sh index e47abc8..bedbac8 100644 --- a/tests/testcases/commands/can-run-command-under-sudo.test.sh +++ b/tests/testcases/commands/can-run-command-under-sudo.test.sh @@ -1,7 +1,7 @@ -tests:ensure :orgalorg-key -C 'whoami' +tests:ensure :orgalorg:with-key -C 'whoami' containers:do tests:assert-stdout "$orgalorg_user" -tests:ensure :orgalorg-key -x -C 'whoami' +tests:ensure :orgalorg:with-key -x -C 'whoami' containers:do tests:assert-stdout "root" diff --git a/tests/testcases/commands/can-run-remote-command-and-pass-stdin-to-it.test.sh b/tests/testcases/commands/can-run-remote-command-and-pass-stdin-to-it.test.sh index e83b1ab..f8890f4 100644 --- a/tests/testcases/commands/can-run-remote-command-and-pass-stdin-to-it.test.sh +++ b/tests/testcases/commands/can-run-remote-command-and-pass-stdin-to-it.test.sh @@ -1,7 +1,7 @@ -tests:ensure :orgalorg-key -C -- wc -l +tests:ensure :orgalorg:with-key -C -- wc -l containers:do tests:assert-stdout "0" -tests:ensure :orgalorg-key -C -i <(echo 1) -- wc -l +tests:ensure :orgalorg:with-key -C -i <(echo 1) -- wc -l containers:do tests:assert-stdout "1" diff --git a/tests/testcases/commands/can-run-remote-command.test.sh b/tests/testcases/commands/can-run-remote-command.test.sh index e98d531..45f8d49 100644 --- a/tests/testcases/commands/can-run-remote-command.test.sh +++ b/tests/testcases/commands/can-run-remote-command.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -e -C pwd +tests:ensure :orgalorg:with-key -e -C pwd tests:assert-stdout "/home/orgalorg" diff --git a/tests/testcases/commands/can-run-shell-command.test.sh b/tests/testcases/commands/can-run-shell-command.test.sh index b00f57c..4cfe9c7 100644 --- a/tests/testcases/commands/can-run-shell-command.test.sh +++ b/tests/testcases/commands/can-run-shell-command.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -C -- echo -n 1 '&&' echo 2 +tests:ensure :orgalorg:with-key -C -- echo -n 1 '&&' echo 2 tests:assert-stdout-re "${ips[0]} 12" diff --git a/tests/testcases/commands/will-flush-lines-so-they-do-not-interleave.test.sh b/tests/testcases/commands/will-flush-lines-so-they-do-not-interleave.test.sh index 1d6295b..20bca22 100644 --- a/tests/testcases/commands/will-flush-lines-so-they-do-not-interleave.test.sh +++ b/tests/testcases/commands/will-flush-lines-so-they-do-not-interleave.test.sh @@ -1,6 +1,6 @@ # checking, that lines from different remote sources do not interleave # each other -tests:ensure :orgalorg-key -C \ +tests:ensure :orgalorg:with-key -C \ seq 1 10000 '|' awk '{print $1}' '|' sort '|' uniq '|' wc -l tests:assert-no-diff "$(containers:count)" stdout diff --git a/tests/testcases/locking/can-acquire-global-lock.test.sh b/tests/testcases/locking/can-acquire-global-lock.test.sh index 73349f1..9104b7f 100644 --- a/tests/testcases/locking/can-acquire-global-lock.test.sh +++ b/tests/testcases/locking/can-acquire-global-lock.test.sh @@ -2,7 +2,7 @@ orgalorg_output="$(tests:get-tmp-dir)/oralorg.stdout" tests:run-background orgalorg \ tests:silence tests:pipe \ - :orgalorg-key --stop-at-lock '2>&1' '|' tee $orgalorg_output + :orgalorg:with-key --lock '2>&1' '|' tee $orgalorg_output while ! cat "$orgalorg_output" 2>/dev/null | grep -qF "waiting for interrupt" do @@ -12,7 +12,7 @@ done tests:debug "[orgalorg] global lock has been acquired" -tests:not tests:ensure :orgalorg-key --stop-at-lock +tests:not tests:ensure :orgalorg:with-key --lock tests:assert-stderr "lock already" orgalorg_pid=$(tests:get-background-pid "$orgalorg") diff --git a/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh b/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh new file mode 100644 index 0000000..d94d0df --- /dev/null +++ b/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh @@ -0,0 +1,25 @@ +orgalorg_output="$(tests:get-tmp-dir)/oralorg.stdout" + +tests:run-background orgalorg \ + tests:silence tests:pipe \ + :orgalorg:with-key --lock '2>&1' '|' tee $orgalorg_output + +while ! cat "$orgalorg_output" 2>/dev/null | grep -qF "waiting for interrupt" +do + tests:debug "[orgalorg] waiting for global lock..." + sleep 0.1 +done + +tests:debug "[orgalorg] global lock has been acquired" + +tests:not tests:ensure :orgalorg:with-key -C -- echo 1 +tests:assert-stderr "continuing" +tests:assert-stdout "1" + +orgalorg_pid=$(tests:get-background-pid "$orgalorg") + +pkill -INT -P "$orgalorg_pid" + +_exited_with_ctrl_c=130 + +wait "$orgalorg_pid" || tests:assert-equals "$_exited_with_ctrl_c" "$?" diff --git a/tests/testcases/newlines/do-append-newlines-in-non-quite-mode-when-necessary.test.sh b/tests/testcases/newlines/do-append-newlines-in-non-quite-mode-when-necessary.test.sh index e584bb3..f589290 100644 --- a/tests/testcases/newlines/do-append-newlines-in-non-quite-mode-when-necessary.test.sh +++ b/tests/testcases/newlines/do-append-newlines-in-non-quite-mode-when-necessary.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -C -- echo -n hello \| wc -l +tests:ensure :orgalorg:with-key -C -- echo -n hello '|' wc -l tests:assert-stdout "$(containers:count)" diff --git a/tests/testcases/newlines/do-not-append-newlines-if-they-are-not-present-in-remote-output-at-quiet-mode.test.sh b/tests/testcases/newlines/do-not-append-newlines-if-they-are-not-present-in-remote-output-at-quiet-mode.test.sh index b2568e9..64297d9 100644 --- a/tests/testcases/newlines/do-not-append-newlines-if-they-are-not-present-in-remote-output-at-quiet-mode.test.sh +++ b/tests/testcases/newlines/do-not-append-newlines-if-they-are-not-present-in-remote-output-at-quiet-mode.test.sh @@ -1,4 +1,4 @@ -tests:ensure :orgalorg-key -q -C -- echo -n hello +tests:ensure :orgalorg:with-key -q -C -- echo -n hello tests:debug $(containers:count) diff --git a/tests/testcases/sync/can-run-simple-command-after-files-sync-even-in-protocol-mode.test.sh b/tests/testcases/sync/can-run-simple-command-after-files-sync-even-in-protocol-mode.test.sh new file mode 100644 index 0000000..2c92d75 --- /dev/null +++ b/tests/testcases/sync/can-run-simple-command-after-files-sync-even-in-protocol-mode.test.sh @@ -0,0 +1,26 @@ +tests:make-tmp-dir dir + +tests:put dir/test-file <&2' if it's passed without prefix -tests:ensure :orgalorg-key -v -C -- echo 1 \; echo err\>\&2 +tests:ensure :orgalorg:with-key -v -C -- echo 1 \; echo err\>\&2 tests:assert-stderr " 1" tests:assert-stderr " err" diff --git a/tests/testcases/verbosity/can-output-ip-in-response-from-remote-server-when-running-command.test.sh b/tests/testcases/verbosity/can-output-ip-in-response-from-remote-server-when-running-command.test.sh index 762ee52..4cad847 100644 --- a/tests/testcases/verbosity/can-output-ip-in-response-from-remote-server-when-running-command.test.sh +++ b/tests/testcases/verbosity/can-output-ip-in-response-from-remote-server-when-running-command.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -C pwd +tests:ensure :orgalorg:with-key -C pwd containers:do tests:assert-stdout-re "${ips[0]} /home/orgalorg" diff --git a/tests/testcases/verbosity/do-not-output-ip-in-quiet-mode.test.sh b/tests/testcases/verbosity/do-not-output-ip-in-quiet-mode.test.sh index 9c67b34..df82620 100644 --- a/tests/testcases/verbosity/do-not-output-ip-in-quiet-mode.test.sh +++ b/tests/testcases/verbosity/do-not-output-ip-in-quiet-mode.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg-key -q -C pwd +tests:ensure :orgalorg:with-key -q -C pwd tests:assert-stdout-re "^/home/orgalorg$" diff --git a/vendor/github.com/reconquest/containers.bash b/vendor/github.com/reconquest/containers.bash index 89e6d08..52fe681 160000 --- a/vendor/github.com/reconquest/containers.bash +++ b/vendor/github.com/reconquest/containers.bash @@ -1 +1 @@ -Subproject commit 89e6d082d605db718dc4befd3009061fe42b2896 +Subproject commit 52fe6816e75681e25228c338e472e586c8ed7339 diff --git a/vendor/github.com/reconquest/hastur.bash b/vendor/github.com/reconquest/hastur.bash index 42498bb..ccd2cad 160000 --- a/vendor/github.com/reconquest/hastur.bash +++ b/vendor/github.com/reconquest/hastur.bash @@ -1 +1 @@ -Subproject commit 42498bb537c4868f2c5aeddf9689e6b868ee7207 +Subproject commit ccd2cad523fa47b799fe5eb51a53ba851baaaed1 diff --git a/vendor/github.com/reconquest/test-runner.bash b/vendor/github.com/reconquest/test-runner.bash index 34b84e6..94e2ed2 160000 --- a/vendor/github.com/reconquest/test-runner.bash +++ b/vendor/github.com/reconquest/test-runner.bash @@ -1 +1 @@ -Subproject commit 34b84e6c70553a01ae7318c62d75ce57defc528b +Subproject commit 94e2ed25a14ed739e76c8f146722975d2a315c75 diff --git a/writer_wrapper.go b/writer_wrapper.go new file mode 100644 index 0000000..8bbcbb8 --- /dev/null +++ b/writer_wrapper.go @@ -0,0 +1,17 @@ +package main + +import ( + "io" +) + +type writerWrapper struct { + writer io.WriteCloser +} + +func (wrapper *writerWrapper) Write(data []byte) (int, error) { + return wrapper.writer.Write(data) +} + +func (wrapper *writerWrapper) Close() error { + return wrapper.writer.Close() +}