Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed Jun 8, 2016
1 parent 4f69c13 commit a6f719a
Show file tree
Hide file tree
Showing 49 changed files with 938 additions and 398 deletions.
10 changes: 5 additions & 5 deletions utils_test.go → address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestCanParseDomainValidDomainAddressWithDefaults(t *testing.T) {
func TestParseAddress_ParseValidDomainAddressWithDefaults(t *testing.T) {
assert := assert.New(t)

tests := []struct {
Expand Down Expand Up @@ -38,13 +38,13 @@ func TestCanParseDomainValidDomainAddressWithDefaults(t *testing.T) {
}

for _, test := range tests {
user, domain, port, err := parseAddress(
address, err := parseAddress(
test.Host, test.DefaultUser, test.DefaultPort,
)

assert.Nil(err)
assert.Equal(test.ExpectedUser, user)
assert.Equal(test.ExpectedPort, port)
assert.Equal(test.ExpectedDomain, domain)
assert.Equal(test.ExpectedUser, address.user)
assert.Equal(test.ExpectedPort, address.port)
assert.Equal(test.ExpectedDomain, address.domain)
}
}
51 changes: 45 additions & 6 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"io"
"os"
"path/filepath"
"sync"
"syscall"

"github.com/reconquest/go-lineflushwriter"
"github.com/reconquest/go-prefixwriter"
"github.com/seletskiy/hierr"
)

Expand All @@ -25,12 +28,33 @@ func startArchiveReceivers(
archiveReceiverCommand = append(
archiveReceiverCommand,
[]string{
`tar`, `-x`, `--verbose`, `--directory`,
rootDir,
`tar`, `-x`, `--directory`, rootDir,
}...,
)

execution, err := runRemoteExecution(lockedNodes, archiveReceiverCommand)
if verbose >= verbosityDebug {
archiveReceiverCommand = append(archiveReceiverCommand, `--verbose`)
}

logMutex := &sync.Mutex{}

execution, err := runRemoteExecution(
lockedNodes,
archiveReceiverCommand,
func(node *remoteExecutionNode) {
node.stdout = lineflushwriter.New(
prefixwriter.New(node.stdout, "{tar} "),
logMutex,
true,
)

node.stderr = lineflushwriter.New(
prefixwriter.New(node.stderr, "{tar} "),
logMutex,
true,
)
},
)
if err != nil {
return nil, hierr.Errorf(
err,
Expand All @@ -43,7 +67,7 @@ func startArchiveReceivers(
}

func archiveFilesToWriter(
target io.Writer,
target io.WriteCloser,
files []string,
preserveUID, preserveGID bool,
) error {
Expand All @@ -57,20 +81,27 @@ func archiveFilesToWriter(

archive := tar.NewWriter(target)
for fileIndex, fileName := range files {
logger.Infof(
infof(
"%5d/%d sending file: '%s'",
fileIndex+1,
len(files),
fileName,
)

writeFileToArchive(
err := writeFileToArchive(
fileName,
archive,
workDir,
preserveUID,
preserveGID,
)
if err != nil {
return hierr.Errorf(
err,
`can't write file to archive: '%s'`,
fileName,
)
}
}

tracef("closing archive stream, %d files sent", len(files))
Expand All @@ -83,6 +114,14 @@ func archiveFilesToWriter(
)
}

err = target.Close()
if err != nil {
return hierr.Errorf(
err,
`can't close target stdin`,
)
}

return nil
}

Expand Down
73 changes: 42 additions & 31 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"strings"
"sync"

"github.com/reconquest/go-lineflushwriter"
"github.com/reconquest/go-prefixwriter"
"github.com/seletskiy/hierr"
)

func runRemoteExecution(
lockedNodes *distributedLock,
command []string,
callback func(*remoteExecutionNode),
) (*remoteExecution, error) {
var (
stdins = []io.WriteCloser{}
Expand Down Expand Up @@ -45,6 +48,23 @@ func runRemoteExecution(
return
}

if callback != nil {
callback(remoteNode)
}

remoteNode.command.SetStdout(remoteNode.stdout)
remoteNode.command.SetStderr(remoteNode.stderr)

err = remoteNode.command.Start()
if err != nil {
errors <- hierr.Errorf(
err,
`can't start remote command`,
)

return
}

nodesMapMutex.Lock()
{
stdins = append(stdins, remoteNode.stdin)
Expand All @@ -67,7 +87,7 @@ func runRemoteExecution(
}

return &remoteExecution{
stdin: multiWriteCloser{stdins},
stdin: &multiWriteCloser{stdins},

nodes: remoteNodes,
}, nil
Expand All @@ -89,52 +109,51 @@ func runRemoteExecutionNode(
var stdout io.WriteCloser
var stderr io.WriteCloser
switch verbose {
default:
stdout = newLineFlushWriter(
logMutex,
newPrefixWriter(
os.Stdout,
case verbosityQuiet:
stdout = lineflushwriter.New(nopCloser{os.Stdout}, logMutex, false)
stderr = lineflushwriter.New(nopCloser{os.Stderr}, logMutex, false)

case verbosityNormal:
stdout = lineflushwriter.New(
prefixwriter.New(
nopCloser{os.Stdout},
node.address.domain+" ",
),
logMutex,
true,
)

stderr = newLineFlushWriter(
logMutex,
newPrefixWriter(
os.Stderr,
stderr = lineflushwriter.New(
prefixwriter.New(
nopCloser{os.Stderr},
node.address.domain+" ",
),
logMutex,
true,
)

case verbosityQuiet:
stdout = newLineFlushWriter(logMutex, os.Stdout, false)
stderr = newLineFlushWriter(logMutex, os.Stderr, false)

default:
fallthrough
case verbosityDebug:
stdout = newLineFlushWriter(
logMutex,
newPrefixWriter(
stdout = lineflushwriter.New(
prefixwriter.New(
newDebugWriter(logger),
node.String()+" {cmd} <stdout> ",
),
logMutex,
false,
)

stderr = newLineFlushWriter(
logMutex,
newPrefixWriter(
stderr = lineflushwriter.New(
prefixwriter.New(
newDebugWriter(logger),
node.String()+" {cmd} <stderr> ",
),
logMutex,
false,
)
}

remoteCommand.SetStdout(stdout)
remoteCommand.SetStderr(stderr)

stdin, err := remoteCommand.StdinPipe()
if err != nil {
return nil, hierr.Errorf(
Expand All @@ -143,14 +162,6 @@ func runRemoteExecutionNode(
)
}

err = remoteCommand.Start()
if err != nil {
return nil, hierr.Errorf(
err,
`can't start remote command`,
)
}

return &remoteExecutionNode{
node: node,
command: remoteCommand,
Expand Down
7 changes: 7 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ func infof(format string, args ...interface{}) {

logger.Infof(format, args...)
}

func warningf(format string, args ...interface{}) {
loggerLock.Lock()
defer loggerLock.Unlock()

logger.Warningf(format, args...)
}
4 changes: 4 additions & 0 deletions debug_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ func (writer debugWriter) Write(data []byte) (int, error) {

return len(data), nil
}

func (writer debugWriter) Close() error {
return nil
}
21 changes: 19 additions & 2 deletions distributed_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package main

import (
"strings"
"time"

"github.com/seletskiy/hierr"
"github.com/theairkit/runcmd"
)

type distributedLock struct {
nodes []*distributedLockNode

noFail bool
}

func (lock *distributedLock) addNodeRunner(
Expand All @@ -32,9 +35,17 @@ func (lock *distributedLock) acquire(filename string) error {
node.String(),
)

_, err := node.lock(filename)

err := node.lock(filename)
if err != nil {
if lock.noFail {
warningf(
hierr.Errorf(
err,
`failed to acquire lock, but continuing execution`,
).Error(),
)
}

nodes := []string{}
for _, node := range lock.nodes {
nodes = append(nodes, node.String())
Expand All @@ -51,3 +62,9 @@ func (lock *distributedLock) acquire(filename string) error {

return nil
}

func (lock *distributedLock) runHeartbeats(period time.Duration) {
for _, node := range lock.nodes {
go heartbeat(period, node)
}
}
Loading

0 comments on commit a6f719a

Please sign in to comment.