From e9f1b625cacfc0ea7cff9bb6a3a561b3b8fb5788 Mon Sep 17 00:00:00 2001 From: Stanislav Seletskiy Date: Tue, 24 May 2016 17:22:07 +0600 Subject: [PATCH] uploading files --- .gitmodules | 6 + address.go | 90 ++++++ archive.go | 256 +++++++++++++++- concurrency.go | 43 +++ concurrency_test.go | 53 ++++ debug.go | 6 + debug_writer.go | 21 ++ distributed_lock.go | 45 +++ distributed_lock_node.go | 123 ++++++++ line_flush_writer.go | 66 ++++ lock.go | 166 ++-------- main.go | 284 ++++++++++++------ multiwrite_closer.go | 41 +++ prefix_writer.go | 33 ++ run_tests.sh | 20 +- runner_factory.go | 41 +++ tests/build.sh | 10 + tests/containers.sh | 68 ----- tests/hastur.sh | 86 ------ tests/orgalorg.sh | 7 + tests/setup.sh | 16 +- tests/ssh.sh | 6 +- tests/sudo.sh | 18 -- tests/teardown.sh | 1 + .../testcases/can-acquire-global-lock.test.sh | 10 +- tests/testcases/can-upload-directory.test.sh | 22 ++ ...d-file-by-absolute-path-by-default.test.sh | 16 + .../testcases/can-upload-single-file.test.sh | 12 + tests/testcases/can-upload-two-files.test.sh | 18 ++ timeouts.go | 61 ++++ utils.go | 38 --- vendor/github.com/reconquest/containers.bash | 1 + vendor/github.com/reconquest/hastur.bash | 1 + vendor/github.com/reconquest/progress.bash | 2 +- vendor/github.com/reconquest/test-runner.bash | 2 +- 35 files changed, 1206 insertions(+), 483 deletions(-) create mode 100644 address.go create mode 100644 concurrency.go create mode 100644 concurrency_test.go create mode 100644 debug.go create mode 100644 debug_writer.go create mode 100644 distributed_lock.go create mode 100644 distributed_lock_node.go create mode 100644 line_flush_writer.go create mode 100644 multiwrite_closer.go create mode 100644 prefix_writer.go create mode 100644 runner_factory.go create mode 100644 tests/build.sh delete mode 100644 tests/containers.sh delete mode 100644 tests/hastur.sh create mode 100644 tests/orgalorg.sh delete mode 100644 tests/sudo.sh create mode 100644 tests/teardown.sh create mode 100644 tests/testcases/can-upload-directory.test.sh create mode 100644 tests/testcases/can-upload-file-by-absolute-path-by-default.test.sh create mode 100644 tests/testcases/can-upload-single-file.test.sh create mode 100644 tests/testcases/can-upload-two-files.test.sh create mode 100644 timeouts.go delete mode 100644 utils.go create mode 160000 vendor/github.com/reconquest/containers.bash create mode 160000 vendor/github.com/reconquest/hastur.bash diff --git a/.gitmodules b/.gitmodules index 958c985..84c2255 100644 --- a/.gitmodules +++ b/.gitmodules @@ -10,3 +10,9 @@ [submodule "vendor/github.com/reconquest/tests.sh"] path = vendor/github.com/reconquest/tests.sh url = https://github.com/reconquest/tests.sh +[submodule "vendor/github.com/reconquest/hastur.bash"] + path = vendor/github.com/reconquest/hastur.bash + url = https://github.com/reconquest/hastur.bash +[submodule "vendor/github.com/reconquest/containers.bash"] + path = vendor/github.com/reconquest/containers.bash + url = https://github.com/reconquest/containers.bash diff --git a/address.go b/address.go new file mode 100644 index 0000000..9121d77 --- /dev/null +++ b/address.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + "regexp" + "strconv" + + "github.com/seletskiy/hierr" +) + +type address struct { + user string + domain string + port int +} + +func (address address) String() string { + return fmt.Sprintf( + "[%s@%s:%d]", + address.user, + address.domain, + address.port, + ) +} + +func parseAddress( + host string, defaultUser string, defaultPort int, +) (address, error) { + hostRegexp := regexp.MustCompile(`^(?:([^@]+)@)?(.*?)(?::(\d+))?$`) + + matches := hostRegexp.FindStringSubmatch(host) + + var ( + user = defaultUser + domain = matches[2] + rawPort = matches[3] + port = defaultPort + ) + + if matches[1] != "" { + user = matches[1] + } + + if rawPort != "" { + var err error + port, err = strconv.Atoi(rawPort) + if err != nil { + return address{}, hierr.Errorf( + err, + `can't parse port number: '%s'`, rawPort, + ) + } + } + + return address{ + user: user, + domain: domain, + port: port, + }, nil +} + +func uniqAddresses(addresses []address) []address { + result := []address{} + + for _, origin := range addresses { + keep := true + + for _, another := range result { + if origin.user != another.user { + continue + } + + if origin.domain != another.domain { + continue + } + + if origin.port != another.port { + continue + } + + keep = false + } + + if keep { + result = append(result, origin) + } + } + + return result +} diff --git a/archive.go b/archive.go index 0b94933..55fd1bc 100644 --- a/archive.go +++ b/archive.go @@ -2,27 +2,180 @@ package main import ( "archive/tar" + "fmt" "io" "os" + "path/filepath" "syscall" + "golang.org/x/crypto/ssh" + "github.com/seletskiy/hierr" + "github.com/theairkit/runcmd" ) -func archiveFilesToStream(target io.Writer, files []string) error { +type archiveReceiverNode struct { + node distributedLockNode + command runcmd.CmdWorker +} + +type archiveReceivers struct { + stdin io.WriteCloser + nodes []archiveReceiverNode +} + +func (receivers *archiveReceivers) wait() error { + err := receivers.stdin.Close() + if err != nil { + return hierr.Errorf( + err, + `can't close archive stream`, + ) + } + + for _, receiver := range receivers.nodes { + err := receiver.command.Wait() + if err != nil { + if sshErr, ok := err.(*ssh.ExitError); ok { + return fmt.Errorf( + `%s failed to receive archive, `+ + `remote command exited with non-zero code: %d`, + receiver.node.String(), + sshErr.Waitmsg.ExitStatus(), + ) + } else { + return hierr.Errorf( + err, + `%s failed to receive archive, unexpected error`, + receiver.node.String(), + ) + } + } + } + + return nil +} + +func startArchiveReceivers( + lockedNodes *distributedLock, + args map[string]interface{}, +) (*archiveReceivers, error) { + var ( + rootDir = args["--root"].(string) + ) + + archiveReceiverCommandString := fmt.Sprintf( + `tar -x --verbose --directory="%s"`, + rootDir, + ) + + unpackers := []io.WriteCloser{} + + nodes := []archiveReceiverNode{} + + for _, node := range lockedNodes.nodes { + debugf(hierr.Errorf( + archiveReceiverCommandString, + "%s starting archive receiver command", + node.String(), + ).Error()) + + archiveReceiverCommand, err := node.runner.Command( + archiveReceiverCommandString, + ) + + if err != nil { + return nil, hierr.Errorf( + err, + `can't create archive receiver command`, + ) + } + + stdin, err := archiveReceiverCommand.StdinPipe() + if err != nil { + return nil, hierr.Errorf( + err, + `can't get stdin from archive receiver command`, + ) + } + + unpackers = append(unpackers, stdin) + + stdout := newLineFlushWriter( + newPrefixWriter( + newDebugWriter(logger), + fmt.Sprintf("%s {tar} ", node.String()), + ), + ) + + archiveReceiverCommand.SetStdout(stdout) + + stderr := newLineFlushWriter( + newPrefixWriter( + newDebugWriter(logger), + fmt.Sprintf("%s {tar} ", node.String()), + ), + ) + + archiveReceiverCommand.SetStderr(stderr) + + err = archiveReceiverCommand.Start() + if err != nil { + return nil, hierr.Errorf( + err, + `can't start archive receiver command`, + ) + } + + nodes = append(nodes, archiveReceiverNode{ + node: node, + command: archiveReceiverCommand, + }) + } + + return &archiveReceivers{ + stdin: multiWriteCloser{unpackers}, + nodes: nodes, + }, nil +} + +func archiveFilesToWriter(target io.Writer, files []string) error { + workDir, err := os.Getwd() + if err != nil { + return hierr.Errorf( + err, + `can't get current working directory`, + ) + } + archive := tar.NewWriter(target) - for _, file := range files { - fileInfo, err := os.Stat(file) + for fileIndex, fileName := range files { + fileInfo, err := os.Stat(fileName) if err != nil { return hierr.Errorf( err, - `can't stat file for archieving: '%s`, file, + `can't stat file for archiving: '%s`, fileName, ) } - err = archive.WriteHeader(&tar.Header{ - Name: file, + // avoid tar warnings about leading slash + tarFileName := fileName + if tarFileName[0] == '/' { + tarFileName = tarFileName[1:] + + fileName, err = filepath.Rel(workDir, fileName) + if err != nil { + return hierr.Errorf( + err, + `can't make relative path from: '%s'`, + fileName, + ) + } + } + + header := &tar.Header{ + Name: tarFileName, Mode: int64(fileInfo.Sys().(*syscall.Stat_t).Mode), Size: fileInfo.Size(), @@ -30,17 +183,62 @@ func archiveFilesToStream(target io.Writer, files []string) error { Gid: int(fileInfo.Sys().(*syscall.Stat_t).Gid), ModTime: fileInfo.ModTime(), - }) + } + + logger.Infof( + "(%d/%d) sending file: '%s'", + fileIndex+1, + len(files), + fileName, + ) + + debugf( + hierr.Errorf( + fmt.Sprintf( + "size: %d bytes; mode: %o; uid/gid: %d/%d; modtime: %s", + header.Size, + header.Mode, + header.Uid, + header.Gid, + header.ModTime, + ), + `local file: %s; remote file: %s`, + fileName, + tarFileName, + ).Error(), + ) + + err = archive.WriteHeader(header) if err != nil { return hierr.Errorf( err, - `can't write tar header for file: '%s'`, file, + `can't write tar header for fileName: '%s'`, fileName, + ) + } + + fileToArchive, err := os.Open(fileName) + if err != nil { + return hierr.Errorf( + err, + `can't open fileName for reading: '%s'`, + fileName, + ) + } + + _, err = io.Copy(archive, fileToArchive) + if err != nil { + return hierr.Errorf( + err, + `can't copy file to the archive: '%s'`, + fileName, ) } } - err := archive.Close() + debugf("closing archive stream, %d files sent", len(files)) + + err = archive.Close() if err != nil { return hierr.Errorf( err, @@ -50,3 +248,43 @@ func archiveFilesToStream(target io.Writer, files []string) error { return nil } + +func getFilesList(relative bool, sources ...string) ([]string, error) { + files := []string{} + + for _, source := range sources { + err := filepath.Walk( + source, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + if !relative { + path, err = filepath.Abs(path) + if err != nil { + return hierr.Errorf( + err, + `can't get absolute path for local file: '%s'`, + path, + ) + } + } + + files = append(files, path) + + return nil + }, + ) + + if err != nil { + return nil, err + } + } + + return files, nil +} diff --git a/concurrency.go b/concurrency.go new file mode 100644 index 0000000..7c46fb5 --- /dev/null +++ b/concurrency.go @@ -0,0 +1,43 @@ +package main + +type ( + terminator struct{} + terminators chan terminator + job func(terminators) + + jobs chan job +) + +func startWorkers( + amount int, +) (jobs, []terminators) { + jobsChannel := make(jobs, 0) + + terminatorsList := []terminators{} + + for i := 0; i < amount; i++ { + terminatorsChannel := make(terminators, 0) + go worker(jobsChannel, terminatorsChannel) + + terminatorsList = append(terminatorsList, terminatorsChannel) + } + + return jobsChannel, terminatorsList +} + +func stopWorkers(terminatorsList []chan<- struct{}) { + for _, terminators := range terminatorsList { + terminators <- struct{}{} + } +} + +func worker(jobsChannel jobs, terminatorsChannel terminators) { + for { + select { + case job := <-jobsChannel: + job(terminatorsChannel) + case <-terminatorsChannel: + return + } + } +} diff --git a/concurrency_test.go b/concurrency_test.go new file mode 100644 index 0000000..3973566 --- /dev/null +++ b/concurrency_test.go @@ -0,0 +1,53 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCanRunSingleWorker(t *testing.T) { + test := assert.New(t) + + result := make(chan int, 0) + jobs, _ := startWorkers(1) + + jobs <- func(_ terminators) { + result <- 2 * 2 + } + + test.Equal(<-result, 4) +} + +func TestCanRunTwoWorkers(t *testing.T) { + test := assert.New(t) + + result := make(chan int, 0) + jobs, _ := startWorkers(2) + + jobs <- func(_ terminators) { + result <- 3 * <-result + } + + jobs <- func(_ terminators) { + result <- 2 * 2 + } + + test.Equal(<-result, 12) +} + +func TestCanTerminateWorkers(t *testing.T) { + test := assert.New(t) + + result := make(chan int, 0) + jobs, terminatorsList := startWorkers(3) + + terminatorsList[0] <- terminator{} + terminatorsList[1] <- terminator{} + + jobs <- func(_ terminators) { + result <- 2 * 2 + } + + test.Equal(<-result, 4) +} diff --git a/debug.go b/debug.go new file mode 100644 index 0000000..01aba87 --- /dev/null +++ b/debug.go @@ -0,0 +1,6 @@ +package main + +func debugf(format string, args ...interface{}) { + // TODO always write debug to the file + logger.Debugf(format, args...) +} diff --git a/debug_writer.go b/debug_writer.go new file mode 100644 index 0000000..28d8754 --- /dev/null +++ b/debug_writer.go @@ -0,0 +1,21 @@ +package main + +import ( + "github.com/kovetskiy/lorg" +) + +type debugWriter struct { + log *lorg.Log +} + +func newDebugWriter(log *lorg.Log) debugWriter { + return debugWriter{ + log: log, + } +} + +func (writer debugWriter) Write(data []byte) (int, error) { + writer.log.Debug(string(data)) + + return len(data), nil +} diff --git a/distributed_lock.go b/distributed_lock.go new file mode 100644 index 0000000..43c925c --- /dev/null +++ b/distributed_lock.go @@ -0,0 +1,45 @@ +package main + +import ( + "strings" + + "github.com/seletskiy/hierr" + "github.com/theairkit/runcmd" +) + +type distributedLock struct { + nodes []distributedLockNode +} + +func (lock *distributedLock) addNodeRunner( + runner runcmd.Runner, + address address, +) error { + lock.nodes = append(lock.nodes, distributedLockNode{ + address: address, + runner: runner, + }) + + return nil +} + +func (lock *distributedLock) acquire(filename string) error { + for _, node := range lock.nodes { + _, err := node.lock(filename) + if err != nil { + nodes := []string{} + for _, node := range lock.nodes { + nodes = append(nodes, node.String()) + } + + return hierr.Errorf( + err, + "failed to lock %d nodes: %s", + len(lock.nodes), + strings.Join(nodes, ", "), + ) + } + } + + return nil +} diff --git a/distributed_lock_node.go b/distributed_lock_node.go new file mode 100644 index 0000000..0d120c1 --- /dev/null +++ b/distributed_lock_node.go @@ -0,0 +1,123 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "strings" + + "github.com/seletskiy/hierr" + "github.com/theairkit/runcmd" +) + +const ( + lockAcquiredString = `acquired` + lockLockedString = `locked` +) + +type distributedLockNode struct { + address address + runner runcmd.Runner +} + +func (node *distributedLockNode) String() string { + return node.address.String() +} + +type distributedLockReleaser struct { + lockReadStdin io.WriteCloser +} + +func (node *distributedLockNode) lock( + filename string, +) (*distributedLockReleaser, error) { + lockCommandString := fmt.Sprintf( + `sh -c $'`+ + `flock -nx %s -c \'`+ + `printf "%s\\n" && read unlock\' || `+ + `printf "%s\\n"'`, + filename, + lockAcquiredString, + lockLockedString, + ) + + logger.Debug(hierr.Errorf( + lockCommandString, + `%s running lock command`, + node, + )) + + lockCommand, err := node.runner.Command(lockCommandString) + if err != nil { + return nil, err + } + + stdout, err := lockCommand.StdoutPipe() + if err != nil { + return nil, hierr.Errorf( + err, + `can't get control stdout pipe from lock process`, + ) + } + + stderr := newLineFlushWriter( + newPrefixWriter( + newDebugWriter(logger), + fmt.Sprintf("%s {flock} ", node.String()), + ), + ) + + lockCommand.SetStderr(stderr) + + stdin, err := lockCommand.StdinPipe() + if err != nil { + return nil, hierr.Errorf( + err, + `can't get control stdin pipe to lock process`, + ) + } + + err = lockCommand.Start() + if err != nil { + return nil, hierr.Errorf( + err, + `%s can't start lock command: '%s`, + node, lockCommandString, + ) + } + + line, err := bufio.NewReader(stdout).ReadString('\n') + if err != nil { + return nil, hierr.Errorf( + err, + `can't read line from lock process`, + ) + } + + switch strings.TrimSpace(line) { + case lockAcquiredString: + // pass + + case lockLockedString: + return nil, fmt.Errorf( + `%s can't acquire lock, `+ + `lock already obtained by another process `+ + `or unavailable`, + node, + ) + + default: + return nil, fmt.Errorf( + `%s unexpected reply string encountered `+ + `instead of '%s' or '%s': '%s'`, + node, lockAcquiredString, lockLockedString, + line, + ) + } + + logger.Debugf(`%s lock acquired`, node) + + return &distributedLockReleaser{ + lockReadStdin: stdin, + }, nil +} diff --git a/line_flush_writer.go b/line_flush_writer.go new file mode 100644 index 0000000..62adba5 --- /dev/null +++ b/line_flush_writer.go @@ -0,0 +1,66 @@ +package main + +import ( + "bufio" + "bytes" + "io" + "sync" +) + +type lineFlushWriter struct { + mutex *sync.Mutex + writer io.Writer + buffer *bytes.Buffer +} + +func newLineFlushWriter(writer io.Writer) lineFlushWriter { + return lineFlushWriter{ + writer: writer, + mutex: &sync.Mutex{}, + buffer: &bytes.Buffer{}, + } +} + +func (writer lineFlushWriter) Write(data []byte) (int, error) { + _, err := writer.buffer.Write(data) + if err != nil { + return 0, err + } + + err = writer.Flush() + if err != nil { + return 0, err + } + + return len(data), nil +} + +func (writer lineFlushWriter) Flush() error { + writer.mutex.Lock() + defer writer.mutex.Unlock() + + if !bytes.Contains(writer.buffer.Bytes(), []byte("\n")) { + return nil + } + + scanner := bufio.NewScanner(writer.buffer) + for scanner.Scan() { + line := scanner.Text() + + _, err := writer.writer.Write([]byte(line + "\n")) + if err != nil { + return err + } + + if !bytes.Contains(writer.buffer.Bytes(), []byte("\n")) { + return nil + } + + } + + return nil +} + +func (writer lineFlushWriter) Close() error { + return writer.Flush() +} diff --git a/lock.go b/lock.go index 180b6e4..c0d89b3 100644 --- a/lock.go +++ b/lock.go @@ -1,156 +1,46 @@ package main -import ( - "bufio" - "fmt" - "io" - "strings" - - "github.com/seletskiy/hierr" - "github.com/theairkit/runcmd" -) - -const ( - lockAcquiredString = `acquired` - lockLockedString = `locked` -) - -type distributedLock struct { - nodes []distributedLockNode -} - -func (lock *distributedLock) addNodeRunner( - runner runcmd.Runner, - user string, domain string, port int, -) error { - lock.nodes = append(lock.nodes, distributedLockNode{ - user: user, - domain: domain, - port: port, - runner: runner, - }) +import "github.com/seletskiy/hierr" + +func acquireDistributedLock( + args map[string]interface{}, + runnerFactory runnerFactory, + addresses []address, +) (*distributedLock, error) { + var ( + lockFile = args["--lock-file"].(string) + ) - return nil -} + lock := &distributedLock{} -func (lock *distributedLock) acquire(filename string) error { - for _, node := range lock.nodes { - _, err := node.lock(filename) + for _, address := range addresses { + runner, err := runnerFactory(address) if err != nil { - nodes := []string{} - for _, node := range lock.nodes { - nodes = append(nodes, node.String()) - } - - return hierr.Errorf( + return nil, hierr.Errorf( err, - "failed to lock %d nodes: %s", - len(lock.nodes), - strings.Join(nodes, ", "), + `can't create runner for address '%s'`, + address, ) } - } - - log.Infof(`global lock acquired on %d nodes`, len(lock.nodes)) - - return nil -} - -type distributedLockNode struct { - user string - domain string - port int - runner runcmd.Runner -} - -func (node *distributedLockNode) String() string { - return fmt.Sprintf("[%s@%s:%d]", node.user, node.domain, node.port) -} - -type distributedLockReleaser struct { - lockReadStdin io.WriteCloser -} - -func (node *distributedLockNode) lock( - filename string, -) (*distributedLockReleaser, error) { - lockCommandString := fmt.Sprintf( - `sh -c "`+ - `flock -nx %s -c '`+ - `printf \"%s\\n\" && read unlock' || `+ - `printf \"%s\\n\""`, - filename, - lockAcquiredString, - lockLockedString, - ) - log.Debug(hierr.Errorf( - lockCommandString, - `%s running lock command`, - node, - )) - - lockCommand, err := node.runner.Command(lockCommandString) - if err != nil { - return nil, err - } - - stdout, err := lockCommand.StdoutPipe() - if err != nil { - return nil, hierr.Errorf( - err, - `can't get control stdout pipe from lock process`, - ) - } - - stdin, err := lockCommand.StdinPipe() - if err != nil { - return nil, hierr.Errorf( - err, - `can't get control stdin pipe to lock process`, - ) - } - - err = lockCommand.Start() - if err != nil { - return nil, hierr.Errorf( - err, - `%s can't start lock command: '%s`, - node, lockCommandString, - ) + err = lock.addNodeRunner(runner, address) + if err != nil { + return nil, hierr.Errorf( + err, + `can't add host to the global cluster lock: '%s'`, + address, + ) + } } - line, err := bufio.NewReader(stdout).ReadString('\n') + err := lock.acquire(lockFile) if err != nil { return nil, hierr.Errorf( err, - `can't read line from lock process`, - ) - } - - switch strings.TrimSpace(line) { - case lockAcquiredString: - // pass - - case lockLockedString: - return nil, fmt.Errorf( - `%s can't acquire lock, `+ - `lock already obtained by another process`, - node, - ) - - default: - return nil, fmt.Errorf( - `%s unexpected reply string encountered `+ - `instead of '%s' or '%s': '%s'`, - node, lockAcquiredString, lockLockedString, - line, + `can't acquire global cluster lock on %d hosts`, + len(addresses), ) } - log.Debugf(`%s lock acquired`, node) - - return &distributedLockReleaser{ - lockReadStdin: stdin, - }, nil + return lock, nil } diff --git a/main.go b/main.go index efb3451..0425380 100644 --- a/main.go +++ b/main.go @@ -5,11 +5,11 @@ import ( "os/user" "strings" "sync" + "time" "github.com/docopt/docopt-go" "github.com/kovetskiy/lorg" "github.com/seletskiy/hierr" - "github.com/theairkit/runcmd" ) const version = "1.0" @@ -17,27 +17,23 @@ const version = "1.0" const usage = `orgalorg - synchronizing files on many hosts. First of all, orgalorg will try to acquire global cluster lock by flock'ing -root directory on each host. If at least one flock fails, then orgalorg -will stop, unless '-k' flag is specified. +file, specified by '--lock-file' on each host. If at least one flock fails, +then orgalorg will stop, unless '-t' flag is specified. orgalorg will create tar-archive from specified files, keeping file attributes and ownerships, then upload archive in parallel to the specified hosts and -unpacks it in the temporary directory. No further actions will be done until -all hosts unpacks the archive. - -Then, gunter will be launched with that temporary directory as templates source -directory with empty data file (e.g. no template processing will be done). -No further actions will be taken until gunter finishes processing without -error. All modified files will be logged host-wise in temporary log file. - -Then, guntalina will be launched with that log file and will apply actions, -that are specified in guntalina config files (each host may have different -actions). - -All output from guntalina will be passed back and returned on stdout. - -Finally, all temporary files will be removed from hosts, optionally keeping -backup of the modified files on every host, and global lock is freed. +unpacks it in the temporary directory (see '-r'). No further actions will be +done until all hosts unpacks the archive. + +If no '-d' flag specified, after upload post-action tool will be launched ( +see '--post-action'). Post-action tool can send stdout and stderr back to +the orgalorg, but it need to be prefixed with special prefix, passed in the +first argument. Additionally, post-action tool can separate it's actions into +several phases, which should be synced across all cluster. To do it, tool +should send 'SYNC' message to the stdout as soon as phase is done. Then, tool +can receive 'SYNC ' back from orgalorg as many times as there are +messages sent from other hosts. Tool can decide how soon it should continue +based on the ammount received 'SYNC' messages. Restrictions: @@ -46,18 +42,24 @@ Restrictions: Usage: orgalorg -h | --help - orgalorg [options] (-o ...|-s)... -S ... - orgalorg [options] (-o ...|-s)... --stop-after-lock + orgalorg [options] (-o ...|-s) -S ... [-d] + orgalorg [options] (-o ...|-s) -C ... + orgalorg [options] (-o ...|-s) (-L | --stop-at-lock) Operation modes: - -S Sync. + -S --sync Sync. Synchronizes files on the specified hosts via 4-stage process: - * global cluster locking; + * global cluster locking (use -L to stop here); * tar-ing files on local machine, transmitting and - unpacking files to the intermediate directory; - * launching copy tool (e.g. gunter/rsync); - * launching action tool (e.g. guntalina); + unpacking files to the intermediate directory + (-d to stop here); + * launching post-action tool such as gunter; + -L --stop-at-lock Will stop right after locking, e.g. will not try to + do sync whatsoever. Will keep lock until interrupted. + -d --stop-at-upload Will lock and upload files into specified intermediate + directory, then stop. + -C --command Run specified command on all hosts. Required options: -o Target host in format [@][:]. @@ -71,97 +73,170 @@ Options: dry mode too. -b --no-backup Do not backup of modified files on each target host. -k --key Identity file (private key), which will be used for - authentication. - [default: ~/.ssh/id_rsa] - -p --password Use password authentication. Password will be - requested on stdin after program start. - Excludes '-i' option. + authentication. If '-k' option is not used, then + password authentication will be used instead. + [default: $HOME/.ssh/id_rsa] + -p --pasword Enable password authentication. + Exclude '-k' option. -x --no-sudo Do not try to obtain root (via 'sudo -i'). By default, orgalorg will try to obtain root and do all actions from root, because it's most common use case. To prevent that behaviour, this option can be used. - -l --no-lock-abort Try to obtain global lock, but only print warning if + -t --no-lock-abort Try to obtain global lock, but only print warning if it cannot be done, do not stop execution. -r --root Specify root dir to extract files into. - [default: /] + [default: /var/run/orgalorg/files/$RUNID] -u --user Username used for connecting to all hosts by default. [default: $USER] -v --verbose Print debug information on stderr. -V --version Print program version. Advanced options: + --lock-file File to put lock onto. + [default: /] + -e --relative Upload files by relative path. By default, all + specified files will be uploaded on the target + hosts by absolute paths, e.g. if you running + orgalorg from '/tmp' dir with argument '-S x', + then file will be uploaded into '/tmp/x' on the + remote hosts. That option switches off that + behavior. --no-preserve-uid Do not preserve UIDs for transferred files. --no-preserve-gid Do not preserve GIDs for transferred files. - --backups-dir Directory, that will be used on the remote hosts for - storing backups. Backups will be stored in the - subdirectory, uniquely named with source hostname - and timestamp. - This option is only useful without '-b'. - [default: /var/orgalorg/backups/] - --temp-dir Use specified directory for storing temporary data - on each host. - [default: /tmp/orgalorg/runs/] - --stop-after-lock Will stop right after locking, e.g. will not try to - do sync whatsoever. Will keep lock until interrupted. - --stop-after-upload Will lock and upload files into specified intermediate - directory, then stop. - --stop-after-copy Will lock, upload files and then run copy tool to sync - files in specified directory with provided files. - --copy-tool Tool for copying files from intermediate directory to - the target directory. - Tool will accept two arguments: source and target - directories. - [default: orgalorg-sync] - --act-tool Tool for running post-actions after synchronizing - files. - Tool will accept one argument: file containing files - changed while synchronization process. - [default: orgalorg-act] + --post-action Run specified post-action tool on each remote node. + Post-action tool should accept followin arguments: + * string prefix, that should be used to prefix all + stdout and stderr from the process; all unprefixed + data will be treated as control commands. + * path to directory which contain received files. + * additional arguments from the '-g' flag. + * -- + * hosts to sync, one per argument. + [default: /usr/lib/orgalorg/post-action] + -g --args Arguments to pass untouched to the post-action tool. + No modification will be done to the passed arg, so + take care about escaping. + -m --simple Treat post-action as simple tool, which is not + support specified protocol messages. No syncc + is possible in that case and all stdout and stderr + will be passed untouched back to the orgalorg. + Exclude '--post-action'. + +Timeout options: + --conn-timeout Remote host connection timeout in milliseconds. + [default: 10000] + --send-timeout Remote host connection data sending timeout in + milliseconds. [default: 10000] + --recv-timeout Remote host connection data receiving timeout in + milliseconds. [default: 10000] + --keep-alive How long to keep connection keeped alive after session + end in milliseconds. [default: 60000] +Control commands: + + SYNC Cause orgalorg to broadcast 'SYNC ' to all + connected nodes. ` const ( - defaultSSHPort = 22 + defaultSSHPort = 22 + SSHPasswordPrompt = "Password: " ) var ( - log = lorg.NewLog() + logger = lorg.NewLog() ) func main() { + logger.SetFormat(lorg.NewFormat("* ${time} ${level:[%s]:left} %s")) currentUser, err := user.Current() if err != nil { - log.Fatal(hierr.Errorf( + logger.Fatal(hierr.Errorf( err, `can't get current user`, )) } - log.SetLevel(lorg.LevelDebug) + logger.SetLevel(lorg.LevelDebug) usage := strings.Replace(usage, "$USER", currentUser.Username, -1) + usage = strings.Replace(usage, "$HOME", currentUser.HomeDir, -1) + usage = strings.Replace(usage, "$RUNID", generateRunID(), -1) args, err := docopt.Parse(usage, nil, true, version, false) if err != nil { panic(err) } switch { - case args["--stop-after-lock"].(bool): + case args["-L"].(bool): + fallthrough + case args["--stop-at-lock"].(bool): fallthrough case args["-S"].(bool): err = synchronize(args) } - log.Fatal(err) + if err != nil { + logger.Fatal(err) + } } func synchronize(args map[string]interface{}) error { var ( - lockOnly = args["--stop-after-lock"].(bool) + SSHKeyPath, _ = args["--key"].(string) + lockOnly = args["--stop-at-lock"].(bool) + fileSources = args[""].([]string) + relative = args["--relative"].(bool) ) - _, err := acquireDistributedLock(args) + addresses, err := parseAddresses(args) + if err != nil { + return hierr.Errorf( + err, + `can't parse all specified addresses`, + ) + } + + timeouts, err := makeTimeouts(args) + if err != nil { + return hierr.Errorf( + err, + `can't parse SSH connection timeouts`, + ) + } + + filesList := []string{} + if !lockOnly { + logger.Info(`building files list`) + filesList, err = getFilesList(relative, fileSources...) + if err != nil { + return hierr.Errorf( + err, + `can't obtain files list to sync from localhost`, + ) + } + + logger.Infof(`file list contains %d files`, len(filesList)) + } + + var runnerFactory runnerFactory + + switch { + case SSHKeyPath != "": + runnerFactory = createRemoteRunnerFactoryWithKey( + SSHKeyPath, + timeouts, + ) + + default: + runnerFactory = createRemoteRunnerFactoryWithAskedPassword( + SSHPasswordPrompt, + timeouts, + ) + } + + cluster, err := acquireDistributedLock(args, runnerFactory, addresses) if err != nil { return hierr.Errorf( err, @@ -169,8 +244,10 @@ func synchronize(args map[string]interface{}) error { ) } + logger.Infof(`global lock acquired on %d nodes`, len(cluster.nodes)) + if lockOnly { - log.Info("--stop-after-lock was passed, waiting for interrupt...") + logger.Info("--stop-at-lock was passed, waiting for interrupt...") wait := sync.WaitGroup{} wait.Add(1) @@ -179,55 +256,64 @@ func synchronize(args map[string]interface{}) error { os.Exit(0) } + receivers, err := startArchiveReceivers(cluster, args) + if err != nil { + return hierr.Errorf( + err, + `can't start archive receivers on the cluster`, + ) + } + + logger.Info(`file upload started`) + + err = archiveFilesToWriter(receivers.stdin, filesList) + if err != nil { + return hierr.Errorf( + err, + `can't archive files and send to the remote nodes`, + ) + } + + logger.Info(`waiting file upload to finish`) + err = receivers.wait() + if err != nil { + return hierr.Errorf( + err, + `can't finish files archive`, + ) + } + + logger.Info(`upload done`) + return nil } -func acquireDistributedLock( - args map[string]interface{}, -) (*distributedLock, error) { +func parseAddresses(args map[string]interface{}) ([]address, error) { var ( defaultUser = args["--user"].(string) - targets = args["-o"].([]string) - rootDir = args["--root"].(string) + hosts = args["-o"].([]string) ) - lock := &distributedLock{} + addresses := []address{} - for _, host := range targets { - user, domain, port, err := parseAddress( + for _, host := range hosts { + address, err := parseAddress( host, defaultUser, defaultSSHPort, ) if err != nil { return nil, hierr.Errorf( err, - `can't parse specified host: '%s'`, host, - ) - } - runner, err := runcmd.NewLocalRunner() - if err != nil { - return nil, hierr.Errorf( - err, - `can't create runner for host: '%s'`, host, + `can't parse specified address '%s'`, + host, ) } - err = lock.addNodeRunner(runner, user, domain, port) - if err != nil { - return nil, hierr.Errorf( - err, - `can't add host to the global cluster lock: '%s'`, host, - ) - } + addresses = append(addresses, address) } - err := lock.acquire(rootDir) - if err != nil { - return nil, hierr.Errorf( - err, - `can't acquire global cluster lock on %d hosts`, - len(targets), - ) - } + return uniqAddresses(addresses), nil +} - return lock, nil +func generateRunID() string { + return time.Now().Format("20060102150405.999999") } diff --git a/multiwrite_closer.go b/multiwrite_closer.go new file mode 100644 index 0000000..586680e --- /dev/null +++ b/multiwrite_closer.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "io" + "strings" +) + +type multiWriteCloser struct { + writers []io.WriteCloser +} + +func (closer multiWriteCloser) Write(data []byte) (int, error) { + writers := []io.Writer{} + for _, writer := range closer.writers { + writers = append(writers, writer) + } + + return io.MultiWriter(writers...).Write(data) +} + +func (closer multiWriteCloser) Close() error { + errs := []string{} + + for _, closer := range closer.writers { + err := closer.Close() + if err != nil { + errs = append(errs, err.Error()) + } + } + + if len(errs) > 0 { + return fmt.Errorf( + "%d errors: %s", + len(errs), + strings.Join(errs, ";"), + ) + } + + return nil +} diff --git a/prefix_writer.go b/prefix_writer.go new file mode 100644 index 0000000..16b26cf --- /dev/null +++ b/prefix_writer.go @@ -0,0 +1,33 @@ +package main + +import ( + "bytes" + "io" + "regexp" +) + +type prefixWriter struct { + writer io.Writer + prefix string +} + +func newPrefixWriter(writer io.Writer, prefix string) prefixWriter { + return prefixWriter{ + writer: writer, + prefix: prefix, + } +} + +func (writer prefixWriter) Write(data []byte) (int, error) { + prefixedData := regexp.MustCompile(`(?m)^`).ReplaceAllLiteral( + bytes.TrimRight(data, "\n"), + []byte(writer.prefix), + ) + + _, err := writer.writer.Write(prefixedData) + if err != nil { + return 0, err + } + + return len(data), nil +} diff --git a/run_tests.sh b/run_tests.sh index d604bc6..f6c540b 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -5,21 +5,22 @@ set -euo pipefail _base_dir=$(dirname "$(readlink -f "${BASH_SOURCE[0]}")") source "$_base_dir/vendor/github.com/reconquest/import.bash/import.bash" +import "github.com/reconquest/hastur" +import "github.com/reconquest/containers" import "github.com/reconquest/progress" import "github.com/reconquest/test-runner" import "github.com/reconquest/tests.sh" include tests/ssh.sh -include tests/sudo.sh -include tests/hastur.sh -include tests/containers.sh +include tests/build.sh +include tests/orgalorg.sh test-runner:set-custom-opts \ --keep-containers \ --keep-images \ --containers-count: -test-runner:handle-opts() { +test-runner:handle-custom-opt() { case "$1" in --keep-containers) :hastur:keep-containers @@ -47,14 +48,17 @@ if [ $? -ne 0 ]; then exit 1 fi -go build - progress:spinner:new _progress_spinner -:hastur:init "$_progress_spinner" openssh,pam +{ + :build:init + + hastur:init openssh,pam,util-linux,tar + +} 2> >(progress:spinner:spin "$_progress_spinner" > /dev/null) :cleanup() { - :hastur:cleanup + containers:wipe progress:spinner:stop "$_progress_spinner" } diff --git a/runner_factory.go b/runner_factory.go new file mode 100644 index 0000000..b59e2c5 --- /dev/null +++ b/runner_factory.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + + "github.com/theairkit/runcmd" +) + +type ( + runnerFactory func(address address) (runcmd.Runner, error) +) + +func createRemoteRunnerFactoryWithKey( + key string, + timeouts *runcmd.Timeouts, +) runnerFactory { + return func(address address) (runcmd.Runner, error) { + return runcmd.NewRemoteKeyAuthRunnerWithTimeouts( + address.user, + fmt.Sprintf("%s:%d", address.domain, address.port), + key, + *timeouts, + ) + } +} + +func createRemoteRunnerFactoryWithAskedPassword( + prompt string, + timeouts *runcmd.Timeouts, +) runnerFactory { + // TODO ask password + password := "" + return func(address address) (runcmd.Runner, error) { + return runcmd.NewRemotePassAuthRunnerWithTimeouts( + address.user, + fmt.Sprintf("%s:%d", address.domain, address.port), + password, + *timeouts, + ) + } +} diff --git a/tests/build.sh b/tests/build.sh new file mode 100644 index 0000000..f725232 --- /dev/null +++ b/tests/build.sh @@ -0,0 +1,10 @@ +:build:init() { + printf "[build] building go binary... " + + if build_out=$(go build -o orgalorg -v 2>&1 | tee /dev/stderr); then + printf "ok.\n" + else + printf "fail.\n\n%s\n" "$build_out" + return 1 + fi +} diff --git a/tests/containers.sh b/tests/containers.sh deleted file mode 100644 index 2c3fe63..0000000 --- a/tests/containers.sh +++ /dev/null @@ -1,68 +0,0 @@ -export _containers_count=${_containers_count:-1} - -:containers:set-count() { - _containers_count=$1 -} - -:containers:count() { - echo "$_containers_count" -} - -:containers:spawn() { - :hastur -p $(:hastur:get-packages) -kS "${@:-/bin/true}" -} - -:containers:destroy() { - local container_name=$1 - - :hastur -f -D "$container_name" -} - -:containers:list() { - :hastur -Qc | awk '{ print $1 }' -} - -:containers:wipe() { - :list-containers | while read container_name; do - :containers:destroy "$container_name" - done -} - -:containers:run() { - local container_name=$1 - shift - - :containers:spawn -n "$container_name" "${@}" -} - -:containers:get-list() { - local var_name="$1" - - eval "$var_name=()" - while read "container_name"; do - eval "$var_name+=($container_name)" - done < <(:containers:list) -} - -:containers:get-ip-list() { - local var_name="$1" - - local ip - - eval "$var_name=()" - while read "container_name"; do - ip=$(:containers:get-ip "$container_name") - eval "$var_name+=($ip)" - done < <(:containers:list) -} - - -:containers:get-ip() { - local container_name="$1" - - :hastur -Q "$container_name" --ip | cut -f1 -d/ -} - -:containers:is-active() { - :hastur:is-active "${@}" -} diff --git a/tests/hastur.sh b/tests/hastur.sh deleted file mode 100644 index 2907228..0000000 --- a/tests/hastur.sh +++ /dev/null @@ -1,86 +0,0 @@ -# FIXME make it possible to specify non-system root dir -export _hastur_root_dir=${_hastur_root_dir:-/var/lib/hastur} - -export _hastur_packages=${_hastur_packages:-bash,coreutils,shadow} - -:hastur:keep-containers() { - :hastur:destroy-containers() { - echo -n "containers are kept in $_hastur_root_dir... " - } - - :hastur:destroy-root() { - : - } -} - -:hastur:keep-images() { - :hastur:destroy-root() { - echo -n "root is kept in $_hastur_root_dir... " - } -} - -:hastur:get-packages() { - echo $_hastur_packages -} - -:hastur() { - :sudo hastur -q -r $_hastur_root_dir "${@}" -} - -:hastur:init() { - local progress_indicator=$1 - shift - - printf "Cheking and initializing hastur... " - - mkdir -p $_hastur_root_dir - - _hastur_packages=$_hastur_packages",$1" - - local hastur_out - - if hastur_out=$( - :hastur -p $_hastur_packages -S /usr/bin/true 2>&1 \ - | progress:spinner:spin "$progress_indicator" - ) - then - printf "ok.\n" - else - printf "fail.\n\n%s\n" "$hastur_out" - return 1 - fi -} - -:hastur:destroy-containers() { - :hastur -Qc \ - | awk '{ print $1 }' \ - | while read container_name; do - :hastur -f -D $container_name - done -} - -:hastur:destroy-root() { - :hastur --free -} - -:hastur:cleanup() { - printf "Cleaning up hastur containers...\n" - - tests:debug() { - echo "${@}" >&2 - } - - :hastur:destroy-containers - - :hastur:destroy-root - - printf "ok.\n" -} - -:hastur:is-active() { - local container_name=$1 - shift - - :sudo:silent hastur -q -r $_hastur_root_dir -Q "$container_name" --ip \ - 2>/dev/null >/dev/null -} diff --git a/tests/orgalorg.sh b/tests/orgalorg.sh new file mode 100644 index 0000000..96e5b54 --- /dev/null +++ b/tests/orgalorg.sh @@ -0,0 +1,7 @@ +# requires setup.sh to be sourced first! + +orgalorg_user="orgalorg" + +:orgalorg-key() { + orgalorg -u $orgalorg_user ${ips[*]/#/-o} -k "$(:ssh:get-key)" "${@}" +} diff --git a/tests/setup.sh b/tests/setup.sh index b7cdebf..317d2eb 100644 --- a/tests/setup.sh +++ b/tests/setup.sh @@ -1,23 +1,21 @@ -tests:clone orgalorg bin/ +tests:clone "orgalorg" "bin/" -tests:debug "!!! spawning $(:containers:count) containers" +tests:debug "!!! spawning $(containers:count) containers" -for (( i = $(:containers:count); i > 0; i-- )); do - tests:ensure :containers:spawn -done +tests:ensure containers:spawn tests:debug "!!! generating local key pair" tests:ensure :ssh:keygen-local "$(:ssh:get-key)" -:containers:get-list "containers" +containers:get-list "containers" tests:debug "!!! bootstrapping containers" for container_name in "${containers[@]}"; do tests:ensure :ssh:keygen-remote "$container_name" - tests:ensure :containers:run "$container_name" -- \ + tests:ensure containers:run "$container_name" -- \ /usr/bin/sh -c " /usr/bin/useradd -G wheel $(:ssh:get-username) @@ -36,14 +34,14 @@ tests:debug "!!! running SSH daemon on containers" for container_name in "${containers[@]}"; do tests:run-background "pid" :ssh:run-daemon "$container_name" "-D" - while ! :containers:is-active "$container_name" :; do + while ! containers:is-active "$container_name"; do tests:debug "[$container_name] is offline" done tests:debug "[$container_name] is online" done -:containers:get-ip-list "ips" +containers:get-ip-list "ips" for container_index in "${!containers[@]}"; do container_name=${containers[$container_index]} diff --git a/tests/ssh.sh b/tests/ssh.sh index eb34c98..f09708d 100644 --- a/tests/ssh.sh +++ b/tests/ssh.sh @@ -10,7 +10,7 @@ local container_name=$1 shift - :containers:run "$container_name" -- \ + containers:run "$container_name" -- \ /usr/bin/sshd "${@:--Dd}" } @@ -25,7 +25,7 @@ local container_name=$1 shift - :containers:run "$container_name" -- \ + containers:run "$container_name" -- \ /usr/bin/ssh-keygen "${@:--A}" } @@ -34,7 +34,7 @@ local username=$2 shift - :containers:run "$container_name" -- \ + containers:run "$container_name" -- \ /usr/bin/tee -a /home/$username/.ssh/authorized_keys > /dev/null } diff --git a/tests/sudo.sh b/tests/sudo.sh deleted file mode 100644 index cd23071..0000000 --- a/tests/sudo.sh +++ /dev/null @@ -1,18 +0,0 @@ -if ! type tests:debug &>/dev/null; then - tests:debug() { - echo "${@}" - } -fi - -function :sudo() { - { - printf "\e[1;31m{sudo} $ %s\e[0m\n" "$1" - printf "\e[1;31m . %s\e[0m\n" "${@:2}" - } | _tests_indent - - :sudo:silent "${@}" -} - -function :sudo:silent() { - command sudo -n "${@}" -} diff --git a/tests/teardown.sh b/tests/teardown.sh new file mode 100644 index 0000000..2c75eaf --- /dev/null +++ b/tests/teardown.sh @@ -0,0 +1 @@ +containers:wipe diff --git a/tests/testcases/can-acquire-global-lock.test.sh b/tests/testcases/can-acquire-global-lock.test.sh index bd78f95..9813a8f 100644 --- a/tests/testcases/can-acquire-global-lock.test.sh +++ b/tests/testcases/can-acquire-global-lock.test.sh @@ -1,10 +1,8 @@ -#tests:silence tests:eval :ssh "${ips[0]}" pwd - orgalorg_output="$(tests:get-tmp-dir)/oralorg.stdout" tests:run-background orgalorg_pid \ tests:silence tests:pipe \ - "orgalorg ${ips[@]/#/-o} --stop-after-lock 2>&1 | tee $orgalorg_output" + :orgalorg-key --stop-at-lock '2>&1' '|' tee $orgalorg_output while ! cat "$orgalorg_output" 2>/dev/null | grep -qF "global lock acquired" do @@ -14,9 +12,11 @@ done tests:debug "[orgalorg] global lock has been acquired" -tests:not tests:ensure orgalorg ${ips[@]/#/-o} --stop-after-lock +tests:not tests:ensure :orgalorg-key --stop-at-lock tests:assert-stderr "lock already" pkill -INT -P $! -wait $! || true +_exited_with_ctrl_c=130 + +wait $! || tests:assert-equals "$_exited_with_ctrl_c" "$?" diff --git a/tests/testcases/can-upload-directory.test.sh b/tests/testcases/can-upload-directory.test.sh new file mode 100644 index 0000000..e020c93 --- /dev/null +++ b/tests/testcases/can-upload-directory.test.sh @@ -0,0 +1,22 @@ +tests:make-tmp-dir dir + +tests:put dir/test-file <