Skip to content

Commit

Permalink
Merge pull request #11 in DEVOPS/orgalorg from serial-output to master
Browse files Browse the repository at this point in the history
* commit '48446c01fd81ac46f504ad74ec8e3dc6cfea5352':
  fix typos
  migrate to loreley TTY detection
  switch to colorgful & barely, add upload status
  wip
  make tests pass and update readme
  update readme
  refactor styles
  coloooors
  fix lock issues
  • Loading branch information
Селецкий Станислав committed Jul 11, 2016
2 parents 4a40ec8 + 48446c0 commit 360d99b
Show file tree
Hide file tree
Showing 29 changed files with 917 additions and 251 deletions.
116 changes: 94 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
# orgalorg

Ultimate parallel cluster file synchronization tool.
Ultimate parallel cluster file synchronization tool and SSH commands
executioner.

![demo](https://raw.githubusercontent.com/reconquest/orgalorg/status-line/demo.gif)

# What
# Features

orgalorg provides easy way of synchronizing files acroess cluster.
* Zero-configuration. No config files.

orgalorg works through ssh & tar, so no unexpected protocol errors will arise.
* Running SSH commands or shell scripts on any number of hosts in parallel. All
output from nodes will be returned back, keeping stdout and stderr streams
mapping of original commands.

In default mode of operation (lately referred as sync mode) orgalorg will
perform following steps in order:
* Synchronizing files and directories across cluster with prior global cluster
locking.
After synchronization is done, arbitrary command can be evaluated.

1. Acquire global cluster lock (more detailed info above).
2. Create, upload and extract specified files in streaming mode to the
specified nodes into temporary run directory.
3. Start synchronization tool on each node, that should relocate files from
temporary run directory to the actual destination.
* Synchronizing files and directories with subsequent run of complex multi-step
scenario with steps synchronization across cluster.

So, orgalorg expected to work with third-party synchronization tool, that
will do actual files relocation and can be quite intricate, **but orgalorg can
work without that tool and perform simple files sync (more on this later)**.
* User-friendly progress indication.

* Both strict or loose modes of failover to be sure that everything will either
fail on any error or try to complete, no matter of what.

* Interactive password authentication as well as SSH public key authentication.

* Ability to run commands through `sudo`.

* Grouped mode of output, so stdout and stderr from nodes will be grouped by
node name. Alternatively, output can be returned as soon as node returns
something.

# Example usages

Expand All @@ -30,19 +40,19 @@ host-specification arguments, like `-o host-a -o host-b`.

## Obtaining global cluster lock

```
```bash
orgalorg -o <host>... -L
```

## Obtaining global cluster lock on custom directory

```
```bash
orgalorg -o <host>... -L -r /etc
```

## Evaluating command on hosts in parallel

```
```bash
orgalorg -o <host>... -C uptime
```

Expand All @@ -51,34 +61,66 @@ orgalorg -o <host>... -C uptime
`axfr` is a tool of your choice for retrieving domain information from your
infrastructure DNS.

```
```bash
axfr | grep phpnode | orgalorg -s -C uptime
```

## Evaluate command under root (passwordless sudo required)

```
```bash
orgalorg -o <host>... -x -C whoami
```

## Copying SSH public key for remote authentication

```
```bash
orgalorg -o <host>... -p -i ~/.ssh/id_rsa.pub -C tee -a ~/.ssh/authorized_keys
```

## Synchronizing configs and then reloading service (like nginx)

```
```bash
orgalorg -o <host>... -xn 'systemctl reload nginx' -S /etc/nginx.conf
```

## Evaluating shell script

```
```bash
orgalorg -o <host>... -i script.bash -C bash
```

## Install package on all nodes and get combined output from each node

```bash
orgalorg -o <host>... -lx -C pacman -Sy my-package --noconfirm
```

## Evaluating shell oneliner

```bash
orgalorg -o <host>... -C sleep '$(($RANDOM % 10))' '&&' echo done
```

# Description

orgalorg provides easy way of synchronizing files across cluster and running
arbitrary SSH commands.

orgalorg works through SSH & tar, so no unexpected protocol errors will arise.

In default mode of operation (lately referred as sync mode) orgalorg will
perform steps in the following order:

1. Acquire global cluster lock (check more detailed info above).
2. Create, upload and extract specified files in streaming mode to the
specified nodes into temporary run directory.
3. Start synchronization tool on each node, that should relocate files from
temporary run directory to the destination.

So, orgalorg expected to work with third-party synchronization tool, that
will do actual files relocation and can be quite intricate, **but orgalorg can
work without that tool and perform simple files sync (more on this later)**.


## Global Cluster Lock

Expand Down Expand Up @@ -251,3 +293,33 @@ continue to the next step of execution process.
<- ORGALORG:132464327653 SYNC [user@node2:1234] phase 1 completed
```

# Testing

To run tests it's enough to:

```
./run_tests
```

## Requirements

Testcases are run through [tests.sh](https://github.com/reconquest/tests.sh)
library.

For every testcase new set of temporary containers will be initialized through
[hastur](https://github.com/seletskiy/hastur), so `systemd` is required for
running test suite.

orgalorg testcases are close to reality as possible, so orgalorg will really
connect via SSH to cluster of containers in each testcase.

## Coverage

Run following command to calculate total coverage (available after running
testsuite):

```bash
make coverage.total
```

Current coverage level is something about **85%**.
65 changes: 55 additions & 10 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func startArchiveReceivers(
cluster *distributedLock,
rootDir string,
sudo bool,
serial bool,
) (*remoteExecution, error) {
var (
command = []string{}
Expand All @@ -39,7 +40,7 @@ func startArchiveReceivers(

logMutex := &sync.Mutex{}

runner := &remoteExecutionRunner{command: command}
runner := &remoteExecutionRunner{command: command, serial: serial}

execution, err := runner.run(
cluster,
Expand Down Expand Up @@ -70,7 +71,7 @@ func startArchiveReceivers(

func archiveFilesToWriter(
target io.WriteCloser,
files []string,
files []file,
preserveUID, preserveGID bool,
) error {
workDir, err := os.Getwd()
Expand All @@ -81,17 +82,55 @@ func archiveFilesToWriter(
)
}

status := &struct {
Phase string
Total int
Fails int
Success int
Written bytesStringer
Bytes bytesStringer
}{
Phase: "upload",
Total: len(files),
}

setStatus(status)

for _, file := range files {
status.Bytes.Amount += file.size
}

archive := tar.NewWriter(target)
for fileIndex, fileName := range files {
stream := io.MultiWriter(archive, callbackWriter(
func(data []byte) (int, error) {
status.Written.Amount += len(data)

err = bar.Render(os.Stderr)
if err != nil {
errorf(
`%s`,
hierr.Errorf(
err,
`can't render status bar`,
),
)
}

return len(data), nil
},
))

for fileIndex, file := range files {
infof(
"%5d/%d sending file: '%s'",
fileIndex+1,
len(files),
fileName,
file.path,
)

err = writeFileToArchive(
fileName,
file.path,
stream,
archive,
workDir,
preserveUID,
Expand All @@ -101,9 +140,11 @@ func archiveFilesToWriter(
return hierr.Errorf(
err,
`can't write file to archive: '%s'`,
fileName,
file.path,
)
}

status.Success++
}

tracef("closing archive stream, %d files sent", len(files))
Expand All @@ -129,6 +170,7 @@ func archiveFilesToWriter(

func writeFileToArchive(
fileName string,
stream io.Writer,
archive *tar.Writer,
workDir string,
preserveUID, preserveGID bool,
Expand Down Expand Up @@ -207,7 +249,7 @@ func writeFileToArchive(
)
}

_, err = io.Copy(archive, fileToArchive)
_, err = io.Copy(stream, fileToArchive)
if err != nil {
return hierr.Errorf(
err,
Expand All @@ -219,8 +261,8 @@ func writeFileToArchive(
return nil
}

func getFilesList(relative bool, sources ...string) ([]string, error) {
files := []string{}
func getFilesList(relative bool, sources ...string) ([]file, error) {
files := []file{}

for _, source := range sources {
err := filepath.Walk(
Expand All @@ -245,7 +287,10 @@ func getFilesList(relative bool, sources ...string) ([]string, error) {
}
}

files = append(files, path)
files = append(files, file{
path: path,
size: int(info.Size()),
})

return nil
},
Expand Down
33 changes: 33 additions & 0 deletions bytes_stringer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"fmt"
)

type bytesStringer struct {
Amount int
}

func (stringer bytesStringer) String() string {
amount := float64(stringer.Amount)

suffixes := map[string]string{
"b": "KiB",
"KiB": "MiB",
"MiB": "GiB",
"GiB": "TiB",
}

suffix := "b"
for amount >= 1024 {
if newSuffix, ok := suffixes[suffix]; ok {
suffix = newSuffix
} else {
break
}

amount /= 1024
}

return fmt.Sprintf("%.2f%s", amount, suffix)
}
9 changes: 9 additions & 0 deletions callback_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

type (
callbackWriter func([]byte) (int, error)
)

func (writer callbackWriter) Write(data []byte) (int, error) {
return writer(data)
}
Loading

0 comments on commit 360d99b

Please sign in to comment.